fastcgi++
transceiver.cpp
Go to the documentation of this file.
00001 
00002 /***************************************************************************
00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org]                       *
00004 *                                                                          *
00005 * This file is part of fastcgi++.                                          *
00006 *                                                                          *
00007 * fastcgi++ is free software: you can redistribute it and/or modify it     *
00008 * under the terms of the GNU Lesser General Public License as  published   *
00009 * by the Free Software Foundation, either version 3 of the License, or (at *
00010 * your option) any later version.                                          *
00011 *                                                                          *
00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or    *
00014 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public     *
00015 * License for more details.                                                *
00016 *                                                                          *
00017 * You should have received a copy of the GNU Lesser General Public License *
00018 * along with fastcgi++.  If not, see <http://www.gnu.org/licenses/>.       *
00019 ****************************************************************************/
00020 
00021 
00022 #include <fastcgi++/transceiver.hpp>
00023 
00024 int Fastcgipp::Transceiver::transmit()
00025 {
00026    while(1)
00027    {{
00028       Buffer::SendBlock sendBlock(buffer.requestRead());
00029       if(sendBlock.size)
00030       {
00031          ssize_t sent = write(sendBlock.fd, sendBlock.data, sendBlock.size);
00032          if(sent<0)
00033          {
00034             if(errno==EPIPE || errno==EBADF)
00035             {
00036                freeFd(sendBlock.fd);
00037                sent=sendBlock.size;
00038             }
00039             else if(errno!=EAGAIN) throw Exceptions::SocketWrite(sendBlock.fd, errno);
00040          }
00041 
00042          buffer.freeRead(sent);
00043          if(sent!=(ssize_t)sendBlock.size)
00044             break;
00045       }
00046       else
00047          break;
00048    }}
00049 
00050    return buffer.empty();
00051 }
00052 
00053 void Fastcgipp::Transceiver::Buffer::secureWrite(size_t size, Protocol::FullId id, bool kill)
00054 {
00055    writeIt->end+=size;
00056    if(minBlockSize>(writeIt->data.get()+Chunk::size-writeIt->end) && ++writeIt==chunks.end())
00057    {
00058       chunks.push_back(Chunk());
00059       --writeIt;
00060    }
00061    frames.push(Frame(size, kill, id));
00062 }
00063 
00064 bool Fastcgipp::Transceiver::handler()
00065 {
00066    using namespace std;
00067    using namespace Protocol;
00068 
00069    bool transmitEmpty=transmit();
00070 
00071    int retVal=poll(&pollFds.front(), pollFds.size(), 0);
00072    if(retVal==0)
00073    {
00074       if(transmitEmpty) return true;
00075       else return false;
00076    }
00077    if(retVal<0) throw Exceptions::SocketPoll(errno);
00078    
00079    std::vector<pollfd>::iterator pollFd = find_if(pollFds.begin(), pollFds.end(), reventsZero);
00080 
00081    if(pollFd->revents&POLLHUP)
00082    {
00083       fdBuffers.erase(pollFd->fd);
00084       pollFds.erase(pollFd);
00085       return false;
00086    }
00087    
00088    int fd=pollFd->fd;
00089    if(fd==socket)
00090    {
00091       sockaddr_un addr;
00092       socklen_t addrlen=sizeof(sockaddr_un);
00093       fd=accept(fd, (sockaddr*)&addr, &addrlen);
00094       fcntl(fd, F_SETFL, (fcntl(fd, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00095       
00096       pollFds.push_back(pollfd());
00097       pollFds.back().fd = fd;
00098       pollFds.back().events = POLLIN|POLLHUP;
00099 
00100       Message& messageBuffer=fdBuffers[fd].messageBuffer;
00101       messageBuffer.size=0;
00102       messageBuffer.type=0;
00103    }
00104    else if(fd==wakeUpFdIn)
00105    {
00106       char x;
00107       read(wakeUpFdIn, &x, 1);
00108       return false;
00109    }
00110    
00111    Message& messageBuffer=fdBuffers[fd].messageBuffer;
00112    Header& headerBuffer=fdBuffers[fd].headerBuffer;
00113 
00114    ssize_t actual;
00115    // Are we in the process of recieving some part of a frame?
00116    if(!messageBuffer.data)
00117    {
00118       // Are we recieving a partial header or new?
00119       actual=read(fd, (char*)&headerBuffer+messageBuffer.size, sizeof(Header)-messageBuffer.size);
00120       if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00121       if(actual>0) messageBuffer.size+=actual;
00122       if(messageBuffer.size!=sizeof(Header))
00123       {
00124          if(transmitEmpty) return true;
00125          else return false;
00126       }
00127 
00128       messageBuffer.data.reset(new char[sizeof(Header)+headerBuffer.getContentLength()+headerBuffer.getPaddingLength()]);
00129       memcpy(static_cast<void*>(messageBuffer.data.get()), static_cast<const void*>(&headerBuffer), sizeof(Header));
00130    }
00131 
00132    const Header& header=*(const Header*)messageBuffer.data.get();
00133    size_t needed=header.getContentLength()+header.getPaddingLength()+sizeof(Header)-messageBuffer.size;
00134    actual=read(fd, messageBuffer.data.get()+messageBuffer.size, needed);
00135    if(actual<0 && errno!=EAGAIN) throw Exceptions::SocketRead(fd, errno);
00136    if(actual>0) messageBuffer.size+=actual;
00137 
00138    // Did we recieve a full frame?
00139    if(actual==(ssize_t)needed)
00140    {     
00141       sendMessage(FullId(headerBuffer.getRequestId(), fd), messageBuffer);
00142       messageBuffer.size=0;
00143       messageBuffer.data.reset();
00144       return false;
00145    }
00146    if(transmitEmpty) return true;
00147    else return false;
00148 }
00149 
00150 void Fastcgipp::Transceiver::Buffer::freeRead(size_t size)
00151 {
00152    pRead+=size;
00153    if(pRead>=chunks.begin()->end)
00154    {
00155       if(writeIt==chunks.begin())
00156       {
00157          pRead=writeIt->data.get();
00158          writeIt->end=pRead;
00159       }
00160       else
00161       {
00162          if(writeIt==--chunks.end())
00163          {
00164             chunks.begin()->end=chunks.begin()->data.get();
00165             chunks.splice(chunks.end(), chunks, chunks.begin());
00166          }
00167          else
00168             chunks.pop_front();
00169          pRead=chunks.begin()->data.get();
00170       }
00171    }
00172    if((frames.front().size-=size)==0)
00173    {
00174       if(frames.front().closeFd)
00175          freeFd(frames.front().id.fd);
00176       frames.pop();
00177    }
00178 
00179 }
00180 
00181 void Fastcgipp::Transceiver::wake()
00182 {
00183    char x;
00184    write(wakeUpFdOut, &x, 1);
00185 }
00186 
00187 Fastcgipp::Transceiver::Transceiver(int fd_, boost::function<void(Protocol::FullId, Message)> sendMessage_)
00188 :buffer(pollFds, fdBuffers), sendMessage(sendMessage_), pollFds(2), socket(fd_)
00189 {
00190    socket=fd_;
00191    
00192    // Let's setup a in/out socket for waking up poll()
00193    int socPair[2];
00194    socketpair(AF_UNIX, SOCK_STREAM, 0, socPair);
00195    wakeUpFdIn=socPair[0];
00196    fcntl(wakeUpFdIn, F_SETFL, (fcntl(wakeUpFdIn, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);  
00197    wakeUpFdOut=socPair[1]; 
00198    
00199    fcntl(socket, F_SETFL, (fcntl(socket, F_GETFL)|O_NONBLOCK)^O_NONBLOCK);
00200    pollFds[0].events = POLLIN|POLLHUP;
00201    pollFds[0].fd = socket;
00202    pollFds[1].events = POLLIN|POLLHUP;
00203    pollFds[1].fd = wakeUpFdIn;
00204 }
00205 
00206 Fastcgipp::Exceptions::SocketWrite::SocketWrite(int fd_, int erno_): Socket(fd_, erno_)
00207 {
00208    switch(errno)
00209    {
00210       case EAGAIN:
00211          msg = "The file descriptor has been marked non-blocking (O_NONBLOCK) and the write would block.";
00212          break;
00213 
00214       case EBADF:
00215          msg = "The file descriptor is not a valid file descriptor or is not open for writing.";
00216          break;
00217 
00218       case EFAULT:
00219          msg = "The buffer is outside your accessible address space.";
00220          break;
00221 
00222       case EFBIG:
00223          msg = "An attempt was made to write a file that exceeds the implementation-defined maximum file size or the process’s file size limit, or to write at a position past the maximum allowed offset.";
00224          break;
00225 
00226       case EINTR:
00227          msg = "The call was interrupted by a signal before any data was written; see signal(7).";
00228          break;
00229 
00230       case EINVAL:
00231          msg = "The file descriptor is attached to an object which is unsuitable for writing; or the file was opened with the O_DIRECT flag, and either the address specified for the buffer, the value specified in count, or the current file offset is not suitably aligned.";
00232          break;
00233 
00234       case EIO:
00235          msg = "A low-level I/O error occurred while modifying the inode.";
00236          break;
00237 
00238       case ENOSPC:
00239          msg = "The device containing the file referred to by the file descriptor has no room for the data.";
00240          break;
00241 
00242       case EPIPE:
00243          msg = "The file descriptor is connected to a pipe or socket whose reading end is closed.  When this happens the writing process will also receive a SIGPIPE signal.  (Thus, the write return value is seen only if the program catches, blocks or ignores this signal.)";
00244          break;
00245    }
00246 }
00247 
00248 Fastcgipp::Exceptions::SocketRead::SocketRead(int fd_, int erno_): Socket(fd_, erno_)
00249 {
00250    switch(errno)
00251    {
00252       case EAGAIN:
00253          msg = "Non-blocking I/O has been selected using O_NONBLOCK and no data was immediately available for reading.";
00254          break;
00255 
00256       case EBADF:
00257          msg = "The file descriptor is not valid or is not open for reading.";
00258          break;
00259 
00260       case EFAULT:
00261          msg = "The buffer is outside your accessible address space.";
00262          break;
00263 
00264       case EINTR:
00265          msg = "The call was interrupted by a signal before any data was written; see signal(7).";
00266          break;
00267 
00268       case EINVAL:
00269          msg = "The file descriptor is attached to an object which is unsuitable for reading; or the file was opened with the O_DIRECT flag, and either the address specified in buf, the value specified in count, or the current file offset is not suitably aligned.";
00270          break;
00271 
00272       case EIO:
00273          msg = "I/O error.  This will happen for example when the process is in a background process group, tries to read from its controlling tty, and either it is ignoring or blocking SIGTTIN or its process group is orphaned.  It may also occur when there is a low-level I/O error while reading from a disk or tape.";
00274          break;
00275 
00276       case EISDIR:
00277          msg = "The file descriptor refers to a directory.";
00278          break;
00279    }
00280 }
00281 
00282 Fastcgipp::Exceptions::SocketPoll::SocketPoll(int erno_): CodedException(0, erno_)
00283 {
00284    switch(errno)
00285    {
00286       case EBADF:
00287          msg = "An invalid file descriptor was given in one of the sets.";
00288          break;
00289 
00290       case EFAULT:
00291          msg = "The array given as argument was not contained in the calling program’s address space.";
00292          break;
00293 
00294       case EINTR:
00295          msg = "A signal occurred before any requested event; see signal(7).";
00296          break;
00297 
00298       case EINVAL:
00299          msg = "The nfds value exceeds the RLIMIT_NOFILE value.";
00300          break;
00301 
00302       case ENOMEM:
00303          msg = "There was no space to allocate file descriptor tables.";
00304          break;
00305    }
00306 }
00307 
00308 void Fastcgipp::Transceiver::freeFd(int fd, std::vector<pollfd>& pollFds, std::map<int, fdBuffer>& fdBuffers)
00309 {
00310    std::vector<pollfd>::iterator it=std::find_if(pollFds.begin(), pollFds.end(), equalsFd(fd));
00311    if(it != pollFds.end())
00312    {
00313       pollFds.erase(it);
00314       close(fd);
00315       fdBuffers.erase(fd);
00316    }
00317 }