Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifndef MANAGER_HPP
00023 #define MANAGER_HPP
00024
00025 #include <map>
00026 #include <string>
00027 #include <queue>
00028 #include <algorithm>
00029 #include <cstring>
00030
00031 #include <boost/bind.hpp>
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/thread.hpp>
00034 #include <boost/thread/shared_mutex.hpp>
00035
00036 #include <signal.h>
00037 #include <pthread.h>
00038
00039 #include <fastcgi++/protocol.hpp>
00040 #include <fastcgi++/transceiver.hpp>
00041
00043 namespace Fastcgipp
00044 {
00046
00056 class ManagerPar
00057 {
00058 public:
00060
00070 ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_);
00071
00072 ~ManagerPar() { instance=0; }
00073
00075
00084 void stop();
00085
00086
00088
00095 void setupSignals();
00096 protected:
00098 Transceiver transceiver;
00099
00101
00105 class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {};
00107
00110 Tasks tasks;
00111
00113 std::queue<Message> messages;
00114
00116
00124 void localHandler(Protocol::FullId id);
00125
00127 bool asleep;
00129 boost::mutex sleepMutex;
00131
00135 pthread_t threadId;
00136
00138
00141 bool stopBool;
00143 boost::mutex stopMutex;
00145
00148 bool terminateBool;
00150 boost::mutex terminateMutex;
00151
00152 private:
00154 static void signalHandler(int signum);
00156 static ManagerPar* instance;
00158
00166 inline void terminate();
00167 };
00168
00170
00180 template<class T> class Manager: public ManagerPar
00181 {
00182 public:
00184
00193 Manager(int fd=0): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2)) {}
00194
00196
00203 void handler();
00204
00206
00221 void push(Protocol::FullId id, Message message);
00222 private:
00224
00228 class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {};
00230
00234 Requests requests;
00235 };
00236 }
00237
00238 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message)
00239 {
00240 using namespace std;
00241 using namespace Protocol;
00242 using namespace boost;
00243
00244 if(id.fcgiId)
00245 {
00246 upgrade_lock<shared_mutex> reqLock(requests);
00247 typename Requests::iterator it(requests.find(id));
00248 if(it!=requests.end())
00249 {
00250 lock_guard<mutex> mesLock(it->second->messages);
00251 it->second->messages.push(message);
00252 lock_guard<mutex> tasksLock(tasks);
00253 tasks.push(id);
00254 }
00255 else if(!message.type)
00256 {
00257 Header& header=*(Header*)message.data.get();
00258 if(header.getType()==BEGIN_REQUEST)
00259 {
00260 BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header));
00261 upgrade_to_unique_lock<shared_mutex> lock(reqLock);
00262 boost::shared_ptr<T>& request = requests[id];
00263 request.reset(new T);
00264 request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1));
00265 }
00266 else
00267 return;
00268 }
00269 }
00270 else
00271 {
00272 messages.push(message);
00273 tasks.push(id);
00274 }
00275
00276 lock_guard<mutex> sleepLock(sleepMutex);
00277 if(asleep)
00278 transceiver.wake();
00279 }
00280
00281 template<class T> void Fastcgipp::Manager<T>::handler()
00282 {
00283 using namespace std;
00284 using namespace boost;
00285
00286 threadId=pthread_self();
00287
00288 while(1)
00289 {{
00290 {
00291 lock_guard<mutex> stopLock(stopMutex);
00292 if(stopBool)
00293 {
00294 stopBool=false;
00295 return;
00296 }
00297 }
00298
00299 bool sleep=transceiver.handler();
00300
00301 {
00302 lock_guard<mutex> terminateLock(terminateMutex);
00303 if(terminateBool)
00304 {
00305 shared_lock<shared_mutex> requestsLock(requests);
00306 if(requests.empty() && sleep)
00307 {
00308 terminateBool=false;
00309 return;
00310 }
00311 }
00312 }
00313
00314 unique_lock<mutex> tasksLock(tasks);
00315 unique_lock<mutex> sleepLock(sleepMutex);
00316
00317 if(tasks.empty())
00318 {
00319 tasksLock.unlock();
00320
00321 asleep=true;
00322 sleepLock.unlock();
00323
00324 if(sleep) transceiver.sleep();
00325
00326 sleepLock.lock();
00327 asleep=false;
00328 sleepLock.unlock();
00329
00330 continue;
00331 }
00332
00333 sleepLock.unlock();
00334
00335 Protocol::FullId id=tasks.front();
00336 tasks.pop();
00337 tasksLock.unlock();
00338
00339 if(id.fcgiId==0)
00340 localHandler(id);
00341 else
00342 {
00343 upgrade_lock<shared_mutex> reqReadLock(requests);
00344 typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id));
00345 if(it!=requests.end() && it->second->handler())
00346 {
00347 upgrade_to_unique_lock<shared_mutex> reqWriteLock(reqReadLock);
00348 requests.erase(it);
00349 }
00350 }
00351 }}
00352 }
00353
00354 #endif