CCAFFEINE  0.8.8
ServerMux.h
00001 #ifdef CCAFE_THREADS
00002 
00003 #ifndef ServerMux_seen
00004 #define ServerMux_seen
00005 
00024 class IOThread :public CCAFEThread {
00025 private:
00026   ConnectionManager* manager;
00027   boolean running;
00028 public:
00029   IOThread(ConnectionManager* manager) {this->manager = manager; running = TRUE; };
00030   void* run() 
00031   {
00032       manager->notifyReads();
00033       return NULL;
00034   };
00035 };
00036 
00037 
00038 
00039 
00040 
00045 class ConnectThread :public CCAFEThread {
00046 private:
00047   ConnectionManager* manager;
00048   boolean running;
00049 public:
00050   ConnectThread(ConnectionManager* manager) {this->manager = manager; running = TRUE; };
00051   void* run() 
00052   {
00053       manager->notifyReconnect();
00054       return NULL;
00055   };
00056 };
00057 
00058 
00059 
00060 
00061 
00065 class ServerMux :public virtual ClientOutputListener, 
00066                  public virtual ClientOutputRelay {
00067 private:
00068   Client* controllerClient;
00069   CCAFEThread* controllerThread;
00070   Client** clients;
00071   int numClients;
00072   CCAFEThreadPool threadpool;
00073   boolean isStarted;
00074   ConnectionManager* computationalClientManager;
00075   ClientOutputCollector* dataCollector;
00076   JCPN(Vector) listeners;
00077   IOThread ioThread;
00078   ConnectThread connThread;
00079   static char* outOfBandToken;
00080   DataCollectorFactory dataCollectorFactory;
00081   CCAFEReadWriteMutex dataCollectorMutex;
00082 
00083 public:
00084   static const char* SERVER_SRC ;// = "ServerMux";
00085   static const char* DATA_COLLECTOR_ACK ;// = "DATA_COLLECTOR_ACK";
00086   static const char* DATA_COLLECTOR_ERR ;// = "DATA_COLLECTOR_NOT_SET";
00087   static const char* DATA_COLLECTOR_MSG ;// = "DATA_COLLECTOR=";
00088   static const char* SHUTDOWN_MSG  ;//= "SHUTDOWN_MSG";
00089   static const char* REMOVE_CLIENT_MSG ;// = "REMOVE_CLIENT_MSG";
00090   ServerMux(ConnectionManager* computeManager, Connection* controllerConnect);
00091   ~ServerMux();
00092   void shutdown();
00093   void join();
00094 
00095   // BUGBUG - no longer applicable - how do we tell when to shutdown?
00096   CCAFEThread* getControllerClientThread();
00097 
00098   void setExternalClientOutputListener(ClientOutputListener* xLsnr);
00099   void doClientIO();
00100   void broadcastToClients(const char* s);
00101   boolean isRunning() { return isStarted; };
00102 
00103   // ClientOutputListener
00104   virtual void clientOutput(ClientOutputEvent* evt);
00105 
00106   // ClientOutputRelay
00107   virtual void relayMessageFromDataProducers(char* s);
00108   virtual void relayMessageFromController(char* s);
00109   virtual void setDataCollectorByName( char* className );
00110   virtual int getNumClients();
00111 
00112   // related to OutOfBandListeners
00113   void addOutOfBandListener(OutOfBandListener* l) {
00114     listeners.addElement(l);
00115   }
00116   void removeOutOfBandListener(OutOfBandListener* l) {
00117     listeners.removeElement(l);
00118   }
00129   static const char* getOutofBandToken(); 
00130 
00131 private:
00133   void doOutOfBandCommands(char* line, Client* src);
00135   void fireOOBListeners(char* cmd, Client* client);
00136 };
00137 #endif // seen servermux
00138 #else
00139 extern int ccafe_no_servermux;
00140 #endif // CCAFE_THREADS