CCAFFEINE
0.8.8
|
00001 #ifdef CCAFE_THREADS /* not visible unless threads supported */ 00002 #ifndef Thread_seen 00003 #define Thread_seen 00004 00014 #include "pthread.h" 00015 #include <stdio.h> 00016 #include <signal.h> 00017 #include <semaphore.h> 00018 // #include <errno.h> 00019 00021 #ifdef OS_MACOSX 00022 # include "pth_rwlock.h" 00023 00024 #define pthread_rwlock_t pth_rwlock_t 00025 #define pthread_rwlock_init pth_rwlock_init 00026 #define pthread_rwlock_rdlock pth_rwlock_rdlock 00027 #define pthread_rwlock_wrlock pth_rwlock_wrlock 00028 #define pthread_rwlock_unlock pth_rwlock_unlock 00029 00030 #endif /* OS_MACOSX */ 00031 00032 00033 class CCAFECondition; 00034 00035 class CCAFERunnable { 00036 public: 00037 virtual void* run() = 0; 00038 00039 }; 00040 00041 00043 class CCAFEThread : public virtual jcpp::Object { 00044 private: 00045 CCAFERunnable* runnable; 00046 pthread_t thread_id; 00047 static void* startThread(void* targetThread); 00048 public: 00049 CCAFEThread() 00050 { 00051 runnable = NULL; 00052 }; 00053 00054 CCAFEThread(CCAFERunnable* newRunnable) 00055 { 00056 runnable = newRunnable; 00057 }; 00063 virtual ~CCAFEThread() 00064 { 00065 join(); 00066 } 00067 inline void setRunnable(CCAFERunnable* newRunnable) 00068 { 00069 runnable = newRunnable; 00070 } 00077 virtual void* run(); 00078 00079 virtual int start() 00080 { 00081 return pthread_create( &thread_id, NULL, &CCAFEThread::startThread, this); 00082 } 00083 virtual void* join(); 00093 inline static void enableInterrupts() { 00094 pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); 00095 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, NULL ); 00096 } 00100 inline static void disableInterrupts() { 00101 pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, NULL ); 00102 } 00114 inline int interrupt() 00115 { 00116 return pthread_cancel( thread_id ); 00117 } 00118 }; 00119 00120 00121 00122 00130 class CCAFEMutex { 00131 private: 00132 friend class CCAFECondition; 00133 pthread_mutex_t mutex_id; 00134 int unlock_internal() 00135 { 00136 return pthread_mutex_unlock(&mutex_id); 00137 }; 00138 public: 00139 00140 CCAFEMutex(); 00141 00142 ~CCAFEMutex() 00143 { 00144 int error = pthread_mutex_destroy(&mutex_id); 00145 if (error == EBUSY) 00146 { 00147 // if there are errors here, no way to recover - ignoring them 00148 pthread_mutex_unlock(&mutex_id); 00149 pthread_mutex_destroy(&mutex_id); 00150 } 00151 }; 00152 00154 inline int lock() 00155 { 00156 return pthread_mutex_lock(&mutex_id); 00157 }; 00158 00162 inline int tryLock() 00163 { 00164 return pthread_mutex_trylock(&mutex_id); 00165 }; 00166 00168 inline int unlock() 00169 { 00170 // remove the last cleanup function for the thread, since 00171 // the mutex is now unlocked. 00172 //pthread_mutex_cleanup_pop(0); 00173 return unlock_internal(); 00174 }; 00175 }; 00176 00183 class CCAFEMutexLock { 00184 private: 00185 CCAFEMutex* mutex; 00186 public: 00187 00189 CCAFEMutexLock(CCAFEMutex* newMutex) 00190 { 00191 mutex = newMutex; 00192 if (mutex) mutex->lock(); 00193 }; 00194 00196 ~CCAFEMutexLock() { if (mutex) mutex->unlock(); }; 00197 00198 }; 00199 00211 class CCAFECondition { 00212 private: 00213 pthread_cond_t cond; 00214 00215 public: 00216 00217 CCAFECondition() { 00218 pthread_cond_init(&cond, NULL); 00219 } 00220 00221 ~CCAFECondition() { pthread_cond_destroy (&cond); }; 00222 00226 inline int wait(CCAFEMutex* mutex) { return pthread_cond_wait (&cond, &(mutex->mutex_id)); }; 00227 00230 inline int signal() { return pthread_cond_signal (&cond); }; 00231 00234 inline int broadcast() { return pthread_cond_broadcast (&cond); }; 00235 }; 00236 00237 00238 class CCAFEReadLock; 00239 class CCAFEWriteLock; 00240 00249 class CCAFEReadWriteMutex { 00250 private: 00251 pthread_rwlock_t rwlock; 00252 public: 00253 00254 CCAFEReadWriteMutex() 00255 { 00256 pthread_rwlock_init(&rwlock, NULL); 00257 }; 00258 00259 protected: 00260 friend class CCAFEReadLock; 00261 friend class CCAFEWriteLock; 00262 00264 void readLock(){ 00265 pthread_rwlock_rdlock(&rwlock); 00266 } 00267 00269 void writeLock() { 00270 pthread_rwlock_wrlock(&rwlock); 00271 } 00272 00273 void unlock() { 00274 pthread_rwlock_unlock(&rwlock); 00275 } 00276 00277 }; 00278 00280 class CCAFEReadLock { 00281 private: 00282 CCAFEReadWriteMutex* rw_mut; 00283 public: 00284 00285 CCAFEReadLock( CCAFEReadWriteMutex* rw_mut_ ) { 00286 rw_mut = rw_mut_; 00287 rw_mut->readLock(); 00288 } 00289 00290 ~CCAFEReadLock() { 00291 rw_mut->unlock(); 00292 } 00293 }; 00295 class CCAFEWriteLock { 00296 private: 00297 CCAFEReadWriteMutex* rw_mut; 00298 public: 00299 00300 CCAFEWriteLock( CCAFEReadWriteMutex* rw_mut_ ) { 00301 rw_mut = rw_mut_; 00302 rw_mut->writeLock(); 00303 } 00304 00305 ~CCAFEWriteLock() { 00306 rw_mut->unlock(); 00307 } 00308 }; 00309 00317 class CCAFESemaphore { 00318 private: 00319 sem_t sem_id; 00320 public: 00322 CCAFESemaphore(){ 00323 sem_init(&sem_id, 0 /* not shared between processes*/, 0 /*initial value*/); 00324 } 00325 00326 ~CCAFESemaphore(){ 00327 sem_destroy(&sem_id); 00328 } 00329 00332 int wait() { 00333 return sem_wait(&sem_id); 00334 } 00335 00337 int tryWait() { 00338 return sem_trywait(&sem_id); 00339 } 00340 00342 int post() { 00343 return sem_post(&sem_id); 00344 } 00345 }; 00346 00347 class CCAFEQueueLock; 00348 00349 class CCAFEThreadSafeQueue { 00350 private: 00351 jcpp::Vector* list; // the list object used to implement the queue 00352 CCAFECondition condition; 00353 CCAFEMutex mutex; 00354 boolean amShutdown; 00355 00356 protected: 00357 friend class CCAFEQueueLock; 00362 void lockQueue() { 00363 mutex.lock(); 00364 }; 00365 00367 void unlockQueue() { 00368 mutex.unlock(); 00369 }; 00370 00371 public: 00372 CCAFEThreadSafeQueue(jcpp::Vector* newList) { list = newList; amShutdown = FALSE;}; 00373 00374 // BUGBUG do we "own" the vector? the items in the vector? 00375 ~CCAFEThreadSafeQueue() {}; 00376 00379 void addLast(jcpp::Object* elem) { 00380 CCAFEMutexLock lock(&mutex); 00381 list->add(elem); 00382 condition.signal(); 00383 }; 00384 00387 jcpp::Object* blockRemoveFirst() { 00388 CCAFEMutexLock lock(&mutex); 00389 while (((list->size()) <1) && (!amShutdown)) { 00390 // wait releases the mutex so that another thread can come 00391 // in and fill the list 00392 condition.wait(&mutex); 00393 } 00394 return list->remove(0); 00395 }; 00396 00399 jcpp::Object* nonblockRemoveFirst() { 00400 CCAFEMutexLock lock(&mutex); 00401 if (list->size() < 1) 00402 return NULL; 00403 else 00404 return list->remove(0); 00405 }; 00406 00408 boolean remove(jcpp::Object* elem) { 00409 CCAFEMutexLock lock(&mutex); 00410 return list->removeElement(elem); 00411 }; 00412 00413 int size() { 00414 CCAFEMutexLock lock(&mutex); 00415 return list->size(); 00416 }; 00417 00418 void ensureCapacity(int cap) { 00419 CCAFEMutexLock lock(&mutex); 00420 list->ensureCapacity(cap); 00421 }; 00422 00423 jcpp::Object* elementAt(int index) { 00424 CCAFEMutexLock lock(&mutex); 00425 return list->elementAt(index); 00426 00427 }; 00428 00431 void removeAllElements() { 00432 CCAFEMutexLock lock(&mutex); 00433 list->removeAllElements(); 00434 }; 00435 00436 boolean contains(jcpp::Object* obj) { 00437 CCAFEMutexLock lock(&mutex); 00438 return list->contains(obj); 00439 } 00440 00442 void shutdown() 00443 { 00444 CCAFEMutexLock lock(&mutex); 00445 amShutdown = TRUE; 00446 condition.broadcast(); 00447 } 00448 }; 00449 00450 00451 00452 00453 class CCAFEQueueLock { 00454 private: 00455 CCAFEThreadSafeQueue* queue; 00456 public: 00457 00458 CCAFEQueueLock(CCAFEThreadSafeQueue* queue_) { 00459 queue = queue_; 00460 queue->lockQueue(); 00461 } 00462 00463 ~CCAFEQueueLock() { 00464 queue->unlockQueue(); 00465 }; 00466 }; 00467 00468 00469 00470 00471 00472 00473 class CCAFEThreadPool; 00474 00475 00486 class CCAFEThreadPoolThread : public CCAFEThread { 00487 private: 00488 CCAFEThreadPool* pool; 00489 CCAFESemaphore done; 00490 CCAFESemaphore start_requested; 00491 boolean shutdown; 00492 void* returnValue; 00493 00494 protected: 00495 friend class CCAFEThreadPool; 00496 void initialize(); 00497 void stop(); 00498 void* joinFinal(); 00499 00500 public: 00501 00502 CCAFEThreadPoolThread(CCAFEThreadPool* newPool) 00503 { pool = newPool; shutdown = FALSE; }; 00506 ~CCAFEThreadPoolThread(); 00507 00510 void* run(); 00511 00513 int start(); 00514 00517 void* join(); 00518 }; 00519 00521 class CCAFEThreadPool { 00522 private: 00523 jcpp::Vector vec1; 00524 jcpp::Vector vec2; 00525 CCAFEThreadSafeQueue availableQueue; 00526 CCAFEThreadSafeQueue runningQueue; // neccessary for shutdown 00527 CCAFEMutex mutex; // must be very careful not to deadlock with the 00528 // queues. 00529 CCAFECondition cond; 00530 int maxSize; 00531 00532 protected: 00533 friend class CCAFEThreadPoolThread; 00534 00535 // may destroy the thread if there are max Size threads in the pool already 00536 void queueThread(CCAFEThread* thread); 00537 00538 void removeThread(CCAFEThread* thread); 00539 00540 public: 00541 CCAFEThreadPool(int size) :availableQueue(&vec1), runningQueue(&vec2) 00542 { 00543 maxSize = size; 00544 }; 00545 00546 CCAFEThread* dequeueThread(); 00547 00548 void setSize(int size); // the maximum # of threads this pool will hand out. 00549 00550 void shutdown(); // kill all threads in the pool 00551 00552 }; 00553 00554 #endif // thread.h seen 00555 #else 00556 extern int ccafe_no_jcpp_util_Thread; 00557 #endif // CCAFE_THREADS