00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include <fastcgi++/transceiver.hpp>
00023
00024 int Fastcgipp::Transceiver::transmit()
00025 {
00026 while(1)
00027 {{
00028 Buffer::SendBlock sendBlock(buffer.requestRead());
00029 if(sendBlock.size)
00030 {
00031 ssize_t sent = write(sendBlock.fd, sendBlock.data, sendBlock.size);
00032 if(sent<0)
00033 {
00034 if(errno==EPIPE || errno==EBADF)
00035 {
00036 freeFd(sendBlock.fd);
00037 sent=sendBlock.size;
00038 }
00039 else if(errno!=EAGAIN) throw Exceptions::SocketWrite(sendBlock.fd, errno);
00040 }
00041
00042 buffer.freeRead(sent);
00043 if(sent!=(ssize_t)sendBlock.size)
00044 break;
00045 }
00046 else
00047 break;
00048 }}
00049
00050 return buffer.empty();
00051 }
00052
00053 void Fastcgipp::Transceiver::Buffer::secureWrite(size_t size, Protocol::FullId id, bool kill)
00054 {
00055 writeIt->end+=size;
00056 if(minBlockSize>(writeIt->data.get()+Chunk::size-writeIt->end) && ++writeIt==chunks.end())
00057 {
00058 chunks.push_back(Chunk());
00059 --writeIt;
00060 }
00061 frames.push(Frame(size, kill, id));
00062 }
00063
00064 bool Fastcgipp::Transceiver::handler()
00065 {
00066 using namespace std;
00067 using namespace Protocol;
00068
00069 bool transmitEmpty=transmit();
00070
00071 int retVal=poll(&pollFds.front(), pollFds.size(), 0);
00072 if(retVal==0)
00073 {
00074 if(transmitEmpty) return true;
00075 else return false;
00076 }
00077 if(retVal<0) throw Exceptions::SocketPoll(errno);
00078
00079 std::vector<pollfd>::iterator pollFd = find_if(pollFds.begin(), pollFds.end(), reventsZero);
00080
00081 if(pollFd->revents&POLLHUP)
00082 {
00083 fdBuffers.erase(pollFd->fd);
00084 pollFds.erase(pollFd);
00085 return false;
00086 }
00087
00088 int fd=pollFd->fd;
00089 if(fd==socket)
00090 {
00091 sockaddr_un addr;
00092 socklen_t addrlen=sizeof(sockaddr_un);
00093 fd=accept(fd, (sockaddr*)&addr, &addrlen);
00094 fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00095
00096 pollFds.push_back(pollfd());
00097 pollFds.back().fd = fd;
00098 pollFds.back().events = POLLIN|POLLHUP;
00099
00100 Message& messageBuffer=fdBuffers[fd].messageBuffer;
00101 messageBuffer.size=0;
00102 messageBuffer.type=0;
00103 }
00104 else if(fd==wakeUpFdIn)
00105 {
00106 char x;
00107 read(wakeUpFdIn, &x, 1);
00108 return false;
00109 }
00110
00111 Message& messageBuffer=fdBuffers[fd].messageBuffer;
00112 Header& headerBuffer=fdBuffers[fd].headerBuffer;
00113
00114 ssize_t actual;
00115
00116 if(!messageBuffer.data)
00117 {
00118
00119 actual=read(fd, (char*)&headerBuffer+messageBuffer.size, sizeof(Header)-messageBuffer.size);
00120 if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00121 if(actual>0) messageBuffer.size+=actual;
00122 if(messageBuffer.size!=sizeof(Header))
00123 {
00124 if(transmitEmpty) return true;
00125 else return false;
00126 }
00127
00128 messageBuffer.data.reset(new char[sizeof(Header)+headerBuffer.getContentLength()+headerBuffer.getPaddingLength()]);
00129 memcpy(static_cast<void*>(messageBuffer.data.get()), static_cast<const void*>(&headerBuffer), sizeof(Header));
00130 }
00131
00132 const Header& header=*(const Header*)messageBuffer.data.get();
00133 size_t needed=header.getContentLength()+header.getPaddingLength()+sizeof(Header)-messageBuffer.size;
00134 actual=read(fd, messageBuffer.data.get()+messageBuffer.size, needed);
00135 if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00136 if(actual>0) messageBuffer.size+=actual;
00137
00138
00139 if(actual==(ssize_t)needed)
00140 {
00141 sendMessage(FullId(headerBuffer.getRequestId(), fd), messageBuffer);
00142 messageBuffer.size=0;
00143 messageBuffer.data.reset();
00144 return false;
00145 }
00146 if(transmitEmpty) return true;
00147 else return false;
00148 }
00149
00150 void Fastcgipp::Transceiver::Buffer::freeRead(size_t size)
00151 {
00152 pRead+=size;
00153 if(pRead>=chunks.begin()->end)
00154 {
00155 if(writeIt==chunks.begin())
00156 {
00157 pRead=writeIt->data.get();
00158 writeIt->end=pRead;
00159 }
00160 else
00161 {
00162 if(writeIt==--chunks.end())
00163 {
00164 chunks.begin()->end=chunks.begin()->data.get();
00165 chunks.splice(chunks.end(), chunks, chunks.begin());
00166 }
00167 else
00168 chunks.pop_front();
00169 pRead=chunks.begin()->data.get();
00170 }
00171 }
00172 if((frames.front().size-=size)==0)
00173 {
00174 if(frames.front().closeFd)
00175 freeFd(frames.front().id.fd);
00176 frames.pop();
00177 }
00178
00179 }
00180
00181 void Fastcgipp::Transceiver::wake()
00182 {
00183 char x;
00184 write(wakeUpFdOut, &x, 1);
00185 }
00186
00187 Fastcgipp::Transceiver::Transceiver(int fd_, boost::function<void(Protocol::FullId, Message)> sendMessage_)
00188 :buffer(pollFds, fdBuffers), sendMessage(sendMessage_), pollFds(2), socket(fd_)
00189 {
00190 socket=fd_;
00191
00192
00193 int socPair[2];
00194 socketpair(AF_UNIX, SOCK_STREAM, 0, socPair);
00195 wakeUpFdIn=socPair[0];
00196 fcntl(wakeUpFdIn, F_SETFL, (fcntl(wakeUpFdIn, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00197 wakeUpFdOut=socPair[1];
00198
00199 fcntl(socket, F_SETFL, (fcntl(socket, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00200 pollFds[0].events = POLLIN|POLLHUP;
00201 pollFds[0].fd = socket;
00202 pollFds[1].events = POLLIN|POLLHUP;
00203 pollFds[1].fd = wakeUpFdIn;
00204 }
00205
00206 Fastcgipp::Exceptions::SocketWrite::SocketWrite(int fd_, int erno_): Socket(fd_, erno_)
00207 {
00208 switch(errno)
00209 {
00210 case EAGAIN:
00211 msg = "The file descriptor has been marked non-blocking (O_NONBLOCK) and the write would block.";
00212 break;
00213
00214 case EBADF:
00215 msg = "The file descriptor is not a valid file descriptor or is not open for writing.";
00216 break;
00217
00218 case EFAULT:
00219 msg = "The buffer is outside your accessible address space.";
00220 break;
00221
00222 case EFBIG:
00223 msg = "An attempt was made to write a file that exceeds the implementation-defined maximum file size or the process’s file size limit, or to write at a position past the maximum allowed offset.";
00224 break;
00225
00226 case EINTR:
00227 msg = "The call was interrupted by a signal before any data was written; see signal(7).";
00228 break;
00229
00230 case EINVAL:
00231 msg = "The file descriptor is attached to an object which is unsuitable for writing; or the file was opened with the O_DIRECT flag, and either the address specified for the buffer, the value specified in count, or the current file offset is not suitably aligned.";
00232 break;
00233
00234 case EIO:
00235 msg = "A low-level I/O error occurred while modifying the inode.";
00236 break;
00237
00238 case ENOSPC:
00239 msg = "The device containing the file referred to by the file descriptor has no room for the data.";
00240 break;
00241
00242 case EPIPE:
00243 msg = "The file descriptor is connected to a pipe or socket whose reading end is closed. When this happens the writing process will also receive a SIGPIPE signal. (Thus, the write return value is seen only if the program catches, blocks or ignores this signal.)";
00244 break;
00245 }
00246 }
00247
00248 Fastcgipp::Exceptions::SocketRead::SocketRead(int fd_, int erno_): Socket(fd_, erno_)
00249 {
00250 switch(errno)
00251 {
00252 case EAGAIN:
00253 msg = "Non-blocking I/O has been selected using O_NONBLOCK and no data was immediately available for reading.";
00254 break;
00255
00256 case EBADF:
00257 msg = "The file descriptor is not valid or is not open for reading.";
00258 break;
00259
00260 case EFAULT:
00261 msg = "The buffer is outside your accessible address space.";
00262 break;
00263
00264 case EINTR:
00265 msg = "The call was interrupted by a signal before any data was written; see signal(7).";
00266 break;
00267
00268 case EINVAL:
00269 msg = "The file descriptor is attached to an object which is unsuitable for reading; or the file was opened with the O_DIRECT flag, and either the address specified in buf, the value specified in count, or the current file offset is not suitably aligned.";
00270 break;
00271
00272 case EIO:
00273 msg = "I/O error. This will happen for example when the process is in a background process group, tries to read from its controlling tty, and either it is ignoring or blocking SIGTTIN or its process group is orphaned. It may also occur when there is a low-level I/O error while reading from a disk or tape.";
00274 break;
00275
00276 case EISDIR:
00277 msg = "The file descriptor refers to a directory.";
00278 break;
00279 }
00280 }
00281
00282 Fastcgipp::Exceptions::SocketPoll::SocketPoll(int erno_): CodedException(0, erno_)
00283 {
00284 switch(errno)
00285 {
00286 case EBADF:
00287 msg = "An invalid file descriptor was given in one of the sets.";
00288 break;
00289
00290 case EFAULT:
00291 msg = "The array given as argument was not contained in the calling program’s address space.";
00292 break;
00293
00294 case EINTR:
00295 msg = "A signal occurred before any requested event; see signal(7).";
00296 break;
00297
00298 case EINVAL:
00299 msg = "The nfds value exceeds the RLIMIT_NOFILE value.";
00300 break;
00301
00302 case ENOMEM:
00303 msg = "There was no space to allocate file descriptor tables.";
00304 break;
00305 }
00306 }
00307
00308 void Fastcgipp::Transceiver::freeFd(int fd, std::vector<pollfd>& pollFds, std::map<int, fdBuffer>& fdBuffers)
00309 {
00310 std::vector<pollfd>::iterator it=std::find_if(pollFds.begin(), pollFds.end(), equalsFd(fd));
00311 if(it != pollFds.end())
00312 {
00313 pollFds.erase(it);
00314 close(fd);
00315 fdBuffers.erase(fd);
00316 }
00317 }