fastcgi++
manager.hpp
Go to the documentation of this file.
00001 
00002 /***************************************************************************
00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org]                       *
00004 *                                                                          *
00005 * This file is part of fastcgi++.                                          *
00006 *                                                                          *
00007 * fastcgi++ is free software: you can redistribute it and/or modify it     *
00008 * under the terms of the GNU Lesser General Public License as  published   *
00009 * by the Free Software Foundation, either version 3 of the License, or (at *
00010 * your option) any later version.                                          *
00011 *                                                                          *
00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT *
00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or    *
00014 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public     *
00015 * License for more details.                                                *
00016 *                                                                          *
00017 * You should have received a copy of the GNU Lesser General Public License *
00018 * along with fastcgi++.  If not, see <http://www.gnu.org/licenses/>.       *
00019 ****************************************************************************/
00020 
00021 
00022 #ifndef MANAGER_HPP
00023 #define MANAGER_HPP
00024 
00025 #include <map>
00026 #include <string>
00027 #include <queue>
00028 #include <algorithm>
00029 #include <cstring>
00030 
00031 #include <boost/bind.hpp>
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/shared_mutex.hpp>
00035 
00036 #include <signal.h>
00037 #include <pthread.h>
00038 
00039 #include <fastcgi++/protocol.hpp>
00040 #include <fastcgi++/transceiver.hpp>
00041 
00043 namespace Fastcgipp
00044 {
00046 
00056    class ManagerPar
00057    {
00058    public:
00060 
00070       ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_);
00071 
00072       ~ManagerPar() { instance=0; }
00073 
00075 
00084       void stop();
00085 
00086       
00088 
00095       void setupSignals();
00096 
00098       size_t getMessagesSize() const { return messages.size(); }
00099 
00100    protected:
00102       Transceiver transceiver;
00103 
00105 
00109       class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {};
00111 
00114       Tasks tasks;
00115 
00117       std::queue<Message> messages;
00118 
00120 
00128       void localHandler(Protocol::FullId id);
00129 
00131       bool asleep;
00133       boost::mutex sleepMutex;
00135 
00139       pthread_t threadId;
00140 
00142 
00145       bool stopBool;
00147       boost::mutex stopMutex;
00149 
00152       bool terminateBool;
00154       boost::mutex terminateMutex;
00155 
00156    private:
00158       static void signalHandler(int signum);
00160       static ManagerPar* instance;
00162 
00170       inline void terminate();
00171    };
00172    
00174 
00184    template<class T> class Manager: public ManagerPar
00185    {
00186    public:
00188 
00197       Manager(int fd=0): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2)) {}
00198 
00200 
00207       void handler();
00208 
00210 
00225       void push(Protocol::FullId id, Message message);
00226 
00228       size_t getRequestsSize() const { return requests.size(); }
00229    private:
00231 
00235       class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {};
00237 
00241       Requests requests;
00242    };
00243 }
00244 
00245 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message)
00246 {
00247    using namespace std;
00248    using namespace Protocol;
00249    using namespace boost;
00250 
00251    if(id.fcgiId)
00252    {
00253       shared_lock<shared_mutex> reqReadLock(requests);
00254       typename Requests::iterator it(requests.find(id));
00255       if(it!=requests.end())
00256       {
00257          lock_guard<mutex> mesLock(it->second->messages);
00258          it->second->messages.push(message);
00259          lock_guard<mutex> tasksLock(tasks);
00260          tasks.push(id);
00261       }
00262       else if(!message.type)
00263       {
00264          Header& header=*(Header*)message.data.get();
00265          if(header.getType()==BEGIN_REQUEST)
00266          {
00267             BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header));
00268 
00269             reqReadLock.unlock();
00270             unique_lock<shared_mutex> reqWriteLock(requests);
00271 
00272             boost::shared_ptr<T>& request = requests[id];
00273             request.reset(new T);
00274             request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1));
00275          }
00276          else
00277             return;
00278       }
00279    }
00280    else
00281    {
00282       messages.push(message);
00283       tasks.push(id);
00284    }
00285 
00286    lock_guard<mutex> sleepLock(sleepMutex);
00287    if(asleep)
00288       transceiver.wake();
00289 }
00290 
00291 template<class T> void Fastcgipp::Manager<T>::handler()
00292 {
00293    using namespace std;
00294    using namespace boost;
00295 
00296    threadId=pthread_self();
00297 
00298    while(1)
00299    {{
00300       {
00301          lock_guard<mutex> stopLock(stopMutex);
00302          if(stopBool)
00303          {
00304             stopBool=false;
00305             return;
00306          }
00307       }
00308       
00309       bool sleep=transceiver.handler();
00310 
00311       {
00312          lock_guard<mutex> terminateLock(terminateMutex);
00313          if(terminateBool)
00314          {
00315             shared_lock<shared_mutex> requestsLock(requests);
00316             if(requests.empty() && sleep)
00317             {
00318                terminateBool=false;
00319                return;
00320             }
00321          }
00322       }
00323 
00324       unique_lock<mutex> tasksLock(tasks);
00325       unique_lock<mutex> sleepLock(sleepMutex);
00326 
00327       if(tasks.empty())
00328       {
00329          tasksLock.unlock();
00330 
00331          asleep=true;
00332          sleepLock.unlock();
00333 
00334          if(sleep) transceiver.sleep();
00335 
00336          sleepLock.lock();
00337          asleep=false;
00338          sleepLock.unlock();
00339 
00340          continue;
00341       }
00342 
00343       sleepLock.unlock();
00344 
00345       Protocol::FullId id=tasks.front();
00346       tasks.pop();
00347       tasksLock.unlock();
00348 
00349       if(id.fcgiId==0)
00350          localHandler(id);
00351       else
00352       {
00353          shared_lock<shared_mutex> reqReadLock(requests);
00354          typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id));
00355          if(it!=requests.end() && it->second->handler())
00356          {
00357             reqReadLock.unlock();
00358             unique_lock<shared_mutex> reqWriteLock(requests);
00359 
00360             requests.erase(it);
00361          }
00362       }
00363    }}
00364 }
00365 
00366 #endif