CCAFFEINE  0.8.8
Thread.h
00001 #ifdef CCAFE_THREADS /* not visible unless threads supported */
00002 #ifndef Thread_seen
00003 #define Thread_seen
00004 
00014 #include "pthread.h"
00015 #include <stdio.h>
00016 #include <signal.h>
00017 #include <semaphore.h>
00018 // #include <errno.h>
00019 
00021 #ifdef OS_MACOSX
00022 # include "pth_rwlock.h"
00023 
00024 #define pthread_rwlock_t      pth_rwlock_t
00025 #define pthread_rwlock_init   pth_rwlock_init
00026 #define pthread_rwlock_rdlock pth_rwlock_rdlock
00027 #define pthread_rwlock_wrlock pth_rwlock_wrlock
00028 #define pthread_rwlock_unlock pth_rwlock_unlock
00029 
00030 #endif /* OS_MACOSX */
00031 
00032 
00033 class CCAFECondition;
00034 
00035 class CCAFERunnable {
00036 public:
00037   virtual void* run() = 0;
00038 
00039 };
00040 
00041 
00043 class CCAFEThread : public virtual jcpp::Object {
00044 private:
00045   CCAFERunnable* runnable;
00046   pthread_t thread_id;
00047   static void* startThread(void* targetThread);
00048 public:
00049   CCAFEThread()
00050   {
00051     runnable = NULL;
00052   };
00053   
00054   CCAFEThread(CCAFERunnable* newRunnable)
00055   {
00056     runnable = newRunnable;
00057   };
00063   virtual ~CCAFEThread() 
00064   {
00065     join();
00066   }
00067   inline void setRunnable(CCAFERunnable* newRunnable) 
00068   {
00069     runnable = newRunnable;
00070   }
00077   virtual void* run(); 
00078   
00079   virtual int start() 
00080   {
00081     return pthread_create( &thread_id, NULL, &CCAFEThread::startThread, this);
00082   }
00083   virtual void* join(); 
00093   inline static void enableInterrupts() {
00094     pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
00095     pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, NULL );
00096   }
00100   inline static void disableInterrupts() {
00101     pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, NULL );
00102   }
00114   inline int interrupt()
00115   {
00116     return pthread_cancel( thread_id );
00117   }
00118 };
00119 
00120 
00121 
00122 
00130 class CCAFEMutex {
00131 private:
00132   friend class CCAFECondition;
00133   pthread_mutex_t mutex_id;
00134   int unlock_internal()
00135   {
00136     return pthread_mutex_unlock(&mutex_id);
00137   };
00138 public:
00139   
00140   CCAFEMutex(); 
00141   
00142   ~CCAFEMutex()
00143   {
00144     int error = pthread_mutex_destroy(&mutex_id);
00145     if (error == EBUSY)
00146       {
00147         // if there are errors here, no way to recover - ignoring them
00148         pthread_mutex_unlock(&mutex_id);
00149         pthread_mutex_destroy(&mutex_id);
00150       }
00151   };
00152   
00154   inline int lock()
00155   { 
00156     return pthread_mutex_lock(&mutex_id);
00157   };
00158   
00162   inline int tryLock()
00163   {
00164     return pthread_mutex_trylock(&mutex_id);
00165   };
00166   
00168   inline int unlock()
00169   {
00170     // remove the last cleanup function for the thread, since
00171     // the mutex is now unlocked.
00172     //pthread_mutex_cleanup_pop(0);
00173     return unlock_internal();
00174   };
00175 };
00176 
00183 class CCAFEMutexLock {
00184 private:
00185   CCAFEMutex* mutex;
00186 public:
00187 
00189   CCAFEMutexLock(CCAFEMutex* newMutex) 
00190   { 
00191     mutex = newMutex; 
00192     if (mutex) mutex->lock(); 
00193   };
00194 
00196   ~CCAFEMutexLock()   { if (mutex) mutex->unlock(); };
00197   
00198 };
00199 
00211 class CCAFECondition {
00212 private:
00213   pthread_cond_t cond;
00214 
00215 public:
00216 
00217   CCAFECondition() {
00218     pthread_cond_init(&cond, NULL);
00219   }
00220 
00221   ~CCAFECondition() { pthread_cond_destroy (&cond); };
00222   
00226   inline int wait(CCAFEMutex* mutex) { return pthread_cond_wait (&cond, &(mutex->mutex_id)); };
00227   
00230   inline int signal() { return pthread_cond_signal (&cond); };
00231   
00234   inline int broadcast() { return pthread_cond_broadcast (&cond); };
00235 };
00236 
00237 
00238 class CCAFEReadLock;
00239 class CCAFEWriteLock;
00240 
00249 class CCAFEReadWriteMutex {
00250 private:
00251   pthread_rwlock_t rwlock;
00252 public:
00253 
00254   CCAFEReadWriteMutex()
00255   {
00256     pthread_rwlock_init(&rwlock, NULL);
00257   };
00258 
00259 protected:
00260   friend class CCAFEReadLock;
00261   friend class CCAFEWriteLock;
00262   
00264   void readLock(){
00265     pthread_rwlock_rdlock(&rwlock);
00266   }
00267 
00269   void writeLock() {
00270     pthread_rwlock_wrlock(&rwlock);
00271   }
00272 
00273   void unlock() {
00274     pthread_rwlock_unlock(&rwlock);
00275   }
00276 
00277 };
00278 
00280 class CCAFEReadLock {
00281 private:
00282   CCAFEReadWriteMutex* rw_mut;
00283 public:
00284 
00285   CCAFEReadLock( CCAFEReadWriteMutex* rw_mut_ ) {
00286     rw_mut = rw_mut_;
00287     rw_mut->readLock();
00288   }
00289 
00290   ~CCAFEReadLock() {
00291     rw_mut->unlock();
00292   }
00293 };
00295 class CCAFEWriteLock {
00296 private:
00297   CCAFEReadWriteMutex* rw_mut;
00298 public:
00299 
00300   CCAFEWriteLock( CCAFEReadWriteMutex* rw_mut_ ) {
00301     rw_mut = rw_mut_;
00302     rw_mut->writeLock();
00303   }
00304 
00305   ~CCAFEWriteLock() {
00306     rw_mut->unlock();
00307   }
00308 };
00309 
00317 class CCAFESemaphore {
00318 private:
00319   sem_t sem_id;
00320 public:
00322   CCAFESemaphore(){
00323     sem_init(&sem_id, 0 /* not shared between processes*/, 0 /*initial value*/);
00324   }
00325 
00326   ~CCAFESemaphore(){
00327     sem_destroy(&sem_id);
00328   }
00329 
00332   int wait() {
00333     return sem_wait(&sem_id);
00334   }
00335 
00337   int tryWait() {
00338     return sem_trywait(&sem_id);
00339   }
00340 
00342   int post() {
00343     return sem_post(&sem_id);
00344   }
00345 };
00346 
00347 class CCAFEQueueLock;
00348 
00349 class CCAFEThreadSafeQueue {
00350 private:
00351   jcpp::Vector* list; // the list object used to implement the queue
00352   CCAFECondition condition;
00353   CCAFEMutex mutex;
00354   boolean amShutdown;
00355 
00356  protected:
00357   friend class CCAFEQueueLock;
00362   void lockQueue() {
00363     mutex.lock();
00364   };
00365 
00367   void unlockQueue() {
00368     mutex.unlock();
00369   };
00370 
00371 public:
00372   CCAFEThreadSafeQueue(jcpp::Vector* newList) { list = newList; amShutdown = FALSE;};
00373   
00374   // BUGBUG do we "own" the vector? the items in the vector?
00375   ~CCAFEThreadSafeQueue() {};
00376   
00379   void addLast(jcpp::Object* elem) {
00380     CCAFEMutexLock lock(&mutex);
00381     list->add(elem);
00382     condition.signal();
00383   };
00384 
00387   jcpp::Object* blockRemoveFirst() {
00388     CCAFEMutexLock lock(&mutex);
00389     while (((list->size()) <1) && (!amShutdown)) {
00390       // wait releases the mutex so that another thread can come
00391       // in and fill the list
00392       condition.wait(&mutex);
00393     }
00394     return list->remove(0);
00395   };
00396 
00399   jcpp::Object* nonblockRemoveFirst() {
00400     CCAFEMutexLock lock(&mutex);
00401     if (list->size() < 1)
00402       return NULL;
00403     else
00404       return list->remove(0);
00405   };
00406 
00408   boolean remove(jcpp::Object* elem) {
00409     CCAFEMutexLock lock(&mutex);
00410     return list->removeElement(elem); 
00411   };
00412 
00413   int size() {
00414     CCAFEMutexLock lock(&mutex);
00415     return list->size();
00416   };
00417   
00418   void ensureCapacity(int cap) {
00419     CCAFEMutexLock lock(&mutex);
00420     list->ensureCapacity(cap);
00421   };
00422   
00423   jcpp::Object* elementAt(int index) {
00424     CCAFEMutexLock lock(&mutex);
00425     return list->elementAt(index);
00426     
00427   };
00428   
00431   void removeAllElements() {
00432     CCAFEMutexLock lock(&mutex);
00433     list->removeAllElements();
00434   };
00435   
00436   boolean contains(jcpp::Object* obj) {
00437     CCAFEMutexLock lock(&mutex);
00438     return list->contains(obj);
00439   }
00440 
00442   void shutdown()
00443   {
00444     CCAFEMutexLock lock(&mutex);
00445     amShutdown = TRUE;
00446     condition.broadcast();
00447   }
00448 };
00449 
00450 
00451 
00452 
00453 class CCAFEQueueLock {
00454  private:
00455   CCAFEThreadSafeQueue* queue;
00456  public:
00457 
00458   CCAFEQueueLock(CCAFEThreadSafeQueue* queue_) {
00459     queue = queue_;
00460     queue->lockQueue();
00461   }
00462 
00463   ~CCAFEQueueLock() {
00464     queue->unlockQueue();
00465   };
00466 };
00467 
00468 
00469 
00470 
00471 
00472 
00473 class CCAFEThreadPool;
00474 
00475 
00486 class CCAFEThreadPoolThread : public CCAFEThread {
00487 private:
00488   CCAFEThreadPool* pool;
00489   CCAFESemaphore done;
00490   CCAFESemaphore start_requested;
00491   boolean shutdown;
00492   void* returnValue;
00493 
00494 protected:
00495   friend class CCAFEThreadPool;
00496   void initialize();
00497   void stop();
00498   void* joinFinal();
00499 
00500 public:
00501 
00502   CCAFEThreadPoolThread(CCAFEThreadPool* newPool) 
00503   { pool = newPool; shutdown = FALSE; };
00506   ~CCAFEThreadPoolThread(); 
00507 
00510   void* run(); 
00511 
00513   int start();
00514 
00517   void* join();
00518 };
00519 
00521 class CCAFEThreadPool {
00522 private:
00523   jcpp::Vector vec1;
00524   jcpp::Vector vec2;
00525   CCAFEThreadSafeQueue availableQueue;
00526   CCAFEThreadSafeQueue runningQueue; // neccessary for shutdown
00527   CCAFEMutex mutex; // must be very careful not to deadlock with the 
00528   // queues.
00529   CCAFECondition cond;
00530   int maxSize;
00531 
00532 protected:
00533   friend class CCAFEThreadPoolThread;
00534 
00535   // may destroy the thread if there are max Size threads in the pool already
00536   void queueThread(CCAFEThread* thread);
00537 
00538   void removeThread(CCAFEThread* thread);
00539 
00540 public:
00541   CCAFEThreadPool(int size) :availableQueue(&vec1), runningQueue(&vec2)
00542   { 
00543     maxSize = size;
00544   };
00545 
00546   CCAFEThread* dequeueThread(); 
00547 
00548   void setSize(int size); // the maximum # of threads this pool will hand out. 
00549 
00550   void shutdown(); // kill all threads in the pool
00551 
00552 };
00553 
00554 #endif // thread.h seen
00555 #else
00556 extern int ccafe_no_jcpp_util_Thread;
00557 #endif // CCAFE_THREADS