fastcgi++
|
00001 00002 /*************************************************************************** 00003 * Copyright (C) 2007 Eddie Carle [eddie@erctech.org] * 00004 * * 00005 * This file is part of fastcgi++. * 00006 * * 00007 * fastcgi++ is free software: you can redistribute it and/or modify it * 00008 * under the terms of the GNU Lesser General Public License as published * 00009 * by the Free Software Foundation, either version 3 of the License, or (at * 00010 * your option) any later version. * 00011 * * 00012 * fastcgi++ is distributed in the hope that it will be useful, but WITHOUT * 00013 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * 00014 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public * 00015 * License for more details. * 00016 * * 00017 * You should have received a copy of the GNU Lesser General Public License * 00018 * along with fastcgi++. If not, see <http://www.gnu.org/licenses/>. * 00019 ****************************************************************************/ 00020 00021 00022 #ifndef MANAGER_HPP 00023 #define MANAGER_HPP 00024 00025 #include <map> 00026 #include <string> 00027 #include <queue> 00028 #include <algorithm> 00029 #include <cstring> 00030 00031 #include <boost/bind.hpp> 00032 #include <boost/shared_ptr.hpp> 00033 #include <boost/thread.hpp> 00034 #include <boost/thread/shared_mutex.hpp> 00035 00036 #include <signal.h> 00037 #include <pthread.h> 00038 00039 #include <fastcgi++/protocol.hpp> 00040 #include <fastcgi++/transceiver.hpp> 00041 00043 namespace Fastcgipp 00044 { 00046 00056 class ManagerPar 00057 { 00058 public: 00060 00070 ManagerPar(int fd, const boost::function<void(Protocol::FullId, Message)>& sendMessage_); 00071 00072 ~ManagerPar() { instance=0; } 00073 00075 00084 void stop(); 00085 00086 00088 00095 void setupSignals(); 00096 00098 size_t getMessagesSize() const { return messages.size(); } 00099 00100 protected: 00102 Transceiver transceiver; 00103 00105 00109 class Tasks: public std::queue<Protocol::FullId>, public boost::mutex {}; 00111 00114 Tasks tasks; 00115 00117 std::queue<Message> messages; 00118 00120 00128 void localHandler(Protocol::FullId id); 00129 00131 bool asleep; 00133 boost::mutex sleepMutex; 00135 00139 pthread_t threadId; 00140 00142 00145 bool stopBool; 00147 boost::mutex stopMutex; 00149 00152 bool terminateBool; 00154 boost::mutex terminateMutex; 00155 00156 private: 00158 static void signalHandler(int signum); 00160 static ManagerPar* instance; 00162 00170 inline void terminate(); 00171 }; 00172 00174 00184 template<class T> class Manager: public ManagerPar 00185 { 00186 public: 00188 00197 Manager(int fd=0): ManagerPar(fd, boost::bind(&Manager::push, boost::ref(*this), _1, _2)) {} 00198 00200 00207 void handler(); 00208 00210 00225 void push(Protocol::FullId id, Message message); 00226 00228 size_t getRequestsSize() const { return requests.size(); } 00229 private: 00231 00235 class Requests: public std::map<Protocol::FullId, boost::shared_ptr<T> >, public boost::shared_mutex {}; 00237 00241 Requests requests; 00242 }; 00243 } 00244 00245 template<class T> void Fastcgipp::Manager<T>::push(Protocol::FullId id, Message message) 00246 { 00247 using namespace std; 00248 using namespace Protocol; 00249 using namespace boost; 00250 00251 if(id.fcgiId) 00252 { 00253 shared_lock<shared_mutex> reqReadLock(requests); 00254 typename Requests::iterator it(requests.find(id)); 00255 if(it!=requests.end()) 00256 { 00257 lock_guard<mutex> mesLock(it->second->messages); 00258 it->second->messages.push(message); 00259 lock_guard<mutex> tasksLock(tasks); 00260 tasks.push(id); 00261 } 00262 else if(!message.type) 00263 { 00264 Header& header=*(Header*)message.data.get(); 00265 if(header.getType()==BEGIN_REQUEST) 00266 { 00267 BeginRequest& body=*(BeginRequest*)(message.data.get()+sizeof(Header)); 00268 00269 reqReadLock.unlock(); 00270 unique_lock<shared_mutex> reqWriteLock(requests); 00271 00272 boost::shared_ptr<T>& request = requests[id]; 00273 request.reset(new T); 00274 request->set(id, transceiver, body.getRole(), !body.getKeepConn(), boost::bind(&Manager::push, boost::ref(*this), id, _1)); 00275 } 00276 else 00277 return; 00278 } 00279 } 00280 else 00281 { 00282 messages.push(message); 00283 tasks.push(id); 00284 } 00285 00286 lock_guard<mutex> sleepLock(sleepMutex); 00287 if(asleep) 00288 transceiver.wake(); 00289 } 00290 00291 template<class T> void Fastcgipp::Manager<T>::handler() 00292 { 00293 using namespace std; 00294 using namespace boost; 00295 00296 threadId=pthread_self(); 00297 00298 while(1) 00299 {{ 00300 { 00301 lock_guard<mutex> stopLock(stopMutex); 00302 if(stopBool) 00303 { 00304 stopBool=false; 00305 return; 00306 } 00307 } 00308 00309 bool sleep=transceiver.handler(); 00310 00311 { 00312 lock_guard<mutex> terminateLock(terminateMutex); 00313 if(terminateBool) 00314 { 00315 shared_lock<shared_mutex> requestsLock(requests); 00316 if(requests.empty() && sleep) 00317 { 00318 terminateBool=false; 00319 return; 00320 } 00321 } 00322 } 00323 00324 unique_lock<mutex> tasksLock(tasks); 00325 unique_lock<mutex> sleepLock(sleepMutex); 00326 00327 if(tasks.empty()) 00328 { 00329 tasksLock.unlock(); 00330 00331 asleep=true; 00332 sleepLock.unlock(); 00333 00334 if(sleep) transceiver.sleep(); 00335 00336 sleepLock.lock(); 00337 asleep=false; 00338 sleepLock.unlock(); 00339 00340 continue; 00341 } 00342 00343 sleepLock.unlock(); 00344 00345 Protocol::FullId id=tasks.front(); 00346 tasks.pop(); 00347 tasksLock.unlock(); 00348 00349 if(id.fcgiId==0) 00350 localHandler(id); 00351 else 00352 { 00353 shared_lock<shared_mutex> reqReadLock(requests); 00354 typename map<Protocol::FullId, boost::shared_ptr<T> >::iterator it(requests.find(id)); 00355 if(it!=requests.end() && it->second->handler()) 00356 { 00357 reqReadLock.unlock(); 00358 unique_lock<shared_mutex> reqWriteLock(requests); 00359 00360 requests.erase(it); 00361 } 00362 } 00363 }} 00364 } 00365 00366 #endif