CCAFFEINE
0.8.8
|
00001 #ifdef CCAFE_THREADS 00002 00003 #ifndef ServerMux_seen 00004 #define ServerMux_seen 00005 00024 class IOThread :public CCAFEThread { 00025 private: 00026 ConnectionManager* manager; 00027 boolean running; 00028 public: 00029 IOThread(ConnectionManager* manager) {this->manager = manager; running = TRUE; }; 00030 void* run() 00031 { 00032 manager->notifyReads(); 00033 return NULL; 00034 }; 00035 }; 00036 00037 00038 00039 00040 00045 class ConnectThread :public CCAFEThread { 00046 private: 00047 ConnectionManager* manager; 00048 boolean running; 00049 public: 00050 ConnectThread(ConnectionManager* manager) {this->manager = manager; running = TRUE; }; 00051 void* run() 00052 { 00053 manager->notifyReconnect(); 00054 return NULL; 00055 }; 00056 }; 00057 00058 00059 00060 00061 00065 class ServerMux :public virtual ClientOutputListener, 00066 public virtual ClientOutputRelay { 00067 private: 00068 Client* controllerClient; 00069 CCAFEThread* controllerThread; 00070 Client** clients; 00071 int numClients; 00072 CCAFEThreadPool threadpool; 00073 boolean isStarted; 00074 ConnectionManager* computationalClientManager; 00075 ClientOutputCollector* dataCollector; 00076 JCPN(Vector) listeners; 00077 IOThread ioThread; 00078 ConnectThread connThread; 00079 static char* outOfBandToken; 00080 DataCollectorFactory dataCollectorFactory; 00081 CCAFEReadWriteMutex dataCollectorMutex; 00082 00083 public: 00084 static const char* SERVER_SRC ;// = "ServerMux"; 00085 static const char* DATA_COLLECTOR_ACK ;// = "DATA_COLLECTOR_ACK"; 00086 static const char* DATA_COLLECTOR_ERR ;// = "DATA_COLLECTOR_NOT_SET"; 00087 static const char* DATA_COLLECTOR_MSG ;// = "DATA_COLLECTOR="; 00088 static const char* SHUTDOWN_MSG ;//= "SHUTDOWN_MSG"; 00089 static const char* REMOVE_CLIENT_MSG ;// = "REMOVE_CLIENT_MSG"; 00090 ServerMux(ConnectionManager* computeManager, Connection* controllerConnect); 00091 ~ServerMux(); 00092 void shutdown(); 00093 void join(); 00094 00095 // BUGBUG - no longer applicable - how do we tell when to shutdown? 00096 CCAFEThread* getControllerClientThread(); 00097 00098 void setExternalClientOutputListener(ClientOutputListener* xLsnr); 00099 void doClientIO(); 00100 void broadcastToClients(const char* s); 00101 boolean isRunning() { return isStarted; }; 00102 00103 // ClientOutputListener 00104 virtual void clientOutput(ClientOutputEvent* evt); 00105 00106 // ClientOutputRelay 00107 virtual void relayMessageFromDataProducers(char* s); 00108 virtual void relayMessageFromController(char* s); 00109 virtual void setDataCollectorByName( char* className ); 00110 virtual int getNumClients(); 00111 00112 // related to OutOfBandListeners 00113 void addOutOfBandListener(OutOfBandListener* l) { 00114 listeners.addElement(l); 00115 } 00116 void removeOutOfBandListener(OutOfBandListener* l) { 00117 listeners.removeElement(l); 00118 } 00129 static const char* getOutofBandToken(); 00130 00131 private: 00133 void doOutOfBandCommands(char* line, Client* src); 00135 void fireOOBListeners(char* cmd, Client* client); 00136 }; 00137 #endif // seen servermux 00138 #else 00139 extern int ccafe_no_servermux; 00140 #endif // CCAFE_THREADS