00001 #ifdef CCAFE_THREADS
00002 #ifndef Thread_seen
00003 #define Thread_seen
00004
00014 #include "pthread.h"
00015 #include <stdio.h>
00016 #include <signal.h>
00017 #include <semaphore.h>
00018
00019
00021 #ifdef OS_MACOSX
00022 # include "pth_rwlock.h"
00023
00024 #define pthread_rwlock_t pth_rwlock_t
00025 #define pthread_rwlock_init pth_rwlock_init
00026 #define pthread_rwlock_rdlock pth_rwlock_rdlock
00027 #define pthread_rwlock_wrlock pth_rwlock_wrlock
00028 #define pthread_rwlock_unlock pth_rwlock_unlock
00029
00030 #endif
00031
00032
00033 class CCAFECondition;
00034
00035 class CCAFERunnable {
00036 public:
00037 virtual void* run() = 0;
00038
00039 };
00040
00041
00043 class CCAFEThread : public virtual jcpp::Object {
00044 private:
00045 CCAFERunnable* runnable;
00046 pthread_t thread_id;
00047 static void* startThread(void* targetThread);
00048 public:
00049 CCAFEThread()
00050 {
00051 runnable = NULL;
00052 };
00053
00054 CCAFEThread(CCAFERunnable* newRunnable)
00055 {
00056 runnable = newRunnable;
00057 };
00063 virtual ~CCAFEThread()
00064 {
00065 join();
00066 }
00067 inline void setRunnable(CCAFERunnable* newRunnable)
00068 {
00069 runnable = newRunnable;
00070 }
00077 virtual void* run();
00078
00079 virtual int start()
00080 {
00081 return pthread_create( &thread_id, NULL, &CCAFEThread::startThread, this);
00082 }
00083 virtual void* join();
00093 inline static void enableInterrupts() {
00094 pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL );
00095 pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED, NULL );
00096 }
00100 inline static void disableInterrupts() {
00101 pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, NULL );
00102 }
00114 inline int interrupt()
00115 {
00116 return pthread_cancel( thread_id );
00117 }
00118 };
00119
00120
00121
00122
00130 class CCAFEMutex {
00131 private:
00132 friend class CCAFECondition;
00133 pthread_mutex_t mutex_id;
00134 int unlock_internal()
00135 {
00136 return pthread_mutex_unlock(&mutex_id);
00137 };
00138 public:
00139
00140 CCAFEMutex();
00141
00142 ~CCAFEMutex()
00143 {
00144 int error = pthread_mutex_destroy(&mutex_id);
00145 if (error == EBUSY)
00146 {
00147
00148 pthread_mutex_unlock(&mutex_id);
00149 pthread_mutex_destroy(&mutex_id);
00150 }
00151 };
00152
00154 inline int lock()
00155 {
00156 return pthread_mutex_lock(&mutex_id);
00157 };
00158
00162 inline int tryLock()
00163 {
00164 return pthread_mutex_trylock(&mutex_id);
00165 };
00166
00168 inline int unlock()
00169 {
00170
00171
00172
00173 return unlock_internal();
00174 };
00175 };
00176
00183 class CCAFEMutexLock {
00184 private:
00185 CCAFEMutex* mutex;
00186 public:
00187
00189 CCAFEMutexLock(CCAFEMutex* newMutex)
00190 {
00191 mutex = newMutex;
00192 if (mutex) mutex->lock();
00193 };
00194
00196 ~CCAFEMutexLock() { if (mutex) mutex->unlock(); };
00197
00198 };
00199
00211 class CCAFECondition {
00212 private:
00213 pthread_cond_t cond;
00214
00215 public:
00216
00217 CCAFECondition() {
00218 pthread_cond_init(&cond, NULL);
00219 }
00220
00221 ~CCAFECondition() { pthread_cond_destroy (&cond); };
00222
00226 inline int wait(CCAFEMutex* mutex) { return pthread_cond_wait (&cond, &(mutex->mutex_id)); };
00227
00230 inline int signal() { return pthread_cond_signal (&cond); };
00231
00234 inline int broadcast() { return pthread_cond_broadcast (&cond); };
00235 };
00236
00237
00238 class CCAFEReadLock;
00239 class CCAFEWriteLock;
00240
00249 class CCAFEReadWriteMutex {
00250 private:
00251 pthread_rwlock_t rwlock;
00252 public:
00253
00254 CCAFEReadWriteMutex()
00255 {
00256 pthread_rwlock_init(&rwlock, NULL);
00257 };
00258
00259 protected:
00260 friend class CCAFEReadLock;
00261 friend class CCAFEWriteLock;
00262
00264 void readLock(){
00265 pthread_rwlock_rdlock(&rwlock);
00266 }
00267
00269 void writeLock() {
00270 pthread_rwlock_wrlock(&rwlock);
00271 }
00272
00273 void unlock() {
00274 pthread_rwlock_unlock(&rwlock);
00275 }
00276
00277 };
00278
00280 class CCAFEReadLock {
00281 private:
00282 CCAFEReadWriteMutex* rw_mut;
00283 public:
00284
00285 CCAFEReadLock( CCAFEReadWriteMutex* rw_mut_ ) {
00286 rw_mut = rw_mut_;
00287 rw_mut->readLock();
00288 }
00289
00290 ~CCAFEReadLock() {
00291 rw_mut->unlock();
00292 }
00293 };
00295 class CCAFEWriteLock {
00296 private:
00297 CCAFEReadWriteMutex* rw_mut;
00298 public:
00299
00300 CCAFEWriteLock( CCAFEReadWriteMutex* rw_mut_ ) {
00301 rw_mut = rw_mut_;
00302 rw_mut->writeLock();
00303 }
00304
00305 ~CCAFEWriteLock() {
00306 rw_mut->unlock();
00307 }
00308 };
00309
00317 class CCAFESemaphore {
00318 private:
00319 sem_t sem_id;
00320 public:
00322 CCAFESemaphore(){
00323 sem_init(&sem_id, 0 , 0 );
00324 }
00325
00326 ~CCAFESemaphore(){
00327 sem_destroy(&sem_id);
00328 }
00329
00332 int wait() {
00333 return sem_wait(&sem_id);
00334 }
00335
00337 int tryWait() {
00338 return sem_trywait(&sem_id);
00339 }
00340
00342 int post() {
00343 return sem_post(&sem_id);
00344 }
00345 };
00346
00347 class CCAFEQueueLock;
00348
00349 class CCAFEThreadSafeQueue {
00350 private:
00351 jcpp::Vector* list;
00352 CCAFECondition condition;
00353 CCAFEMutex mutex;
00354 boolean amShutdown;
00355
00356 protected:
00357 friend class CCAFEQueueLock;
00362 void lockQueue() {
00363 mutex.lock();
00364 };
00365
00367 void unlockQueue() {
00368 mutex.unlock();
00369 };
00370
00371 public:
00372 CCAFEThreadSafeQueue(jcpp::Vector* newList) { list = newList; amShutdown = FALSE;};
00373
00374
00375 ~CCAFEThreadSafeQueue() {};
00376
00379 void addLast(jcpp::Object* elem) {
00380 CCAFEMutexLock lock(&mutex);
00381 list->add(elem);
00382 condition.signal();
00383 };
00384
00387 jcpp::Object* blockRemoveFirst() {
00388 CCAFEMutexLock lock(&mutex);
00389 while (((list->size()) <1) && (!amShutdown)) {
00390
00391
00392 condition.wait(&mutex);
00393 }
00394 return list->remove(0);
00395 };
00396
00399 jcpp::Object* nonblockRemoveFirst() {
00400 CCAFEMutexLock lock(&mutex);
00401 if (list->size() < 1)
00402 return NULL;
00403 else
00404 return list->remove(0);
00405 };
00406
00408 boolean remove(jcpp::Object* elem) {
00409 CCAFEMutexLock lock(&mutex);
00410 return list->removeElement(elem);
00411 };
00412
00413 int size() {
00414 CCAFEMutexLock lock(&mutex);
00415 return list->size();
00416 };
00417
00418 void ensureCapacity(int cap) {
00419 CCAFEMutexLock lock(&mutex);
00420 list->ensureCapacity(cap);
00421 };
00422
00423 jcpp::Object* elementAt(int index) {
00424 CCAFEMutexLock lock(&mutex);
00425 return list->elementAt(index);
00426
00427 };
00428
00431 void removeAllElements() {
00432 CCAFEMutexLock lock(&mutex);
00433 list->removeAllElements();
00434 };
00435
00436 boolean contains(jcpp::Object* obj) {
00437 CCAFEMutexLock lock(&mutex);
00438 return list->contains(obj);
00439 }
00440
00442 void shutdown()
00443 {
00444 CCAFEMutexLock lock(&mutex);
00445 amShutdown = TRUE;
00446 condition.broadcast();
00447 }
00448 };
00449
00450
00451
00452
00453 class CCAFEQueueLock {
00454 private:
00455 CCAFEThreadSafeQueue* queue;
00456 public:
00457
00458 CCAFEQueueLock(CCAFEThreadSafeQueue* queue_) {
00459 queue = queue_;
00460 queue->lockQueue();
00461 }
00462
00463 ~CCAFEQueueLock() {
00464 queue->unlockQueue();
00465 };
00466 };
00467
00468
00469
00470
00471
00472
00473 class CCAFEThreadPool;
00474
00475
00486 class CCAFEThreadPoolThread : public CCAFEThread {
00487 private:
00488 CCAFEThreadPool* pool;
00489 CCAFESemaphore done;
00490 CCAFESemaphore start_requested;
00491 boolean shutdown;
00492 void* returnValue;
00493
00494 protected:
00495 friend class CCAFEThreadPool;
00496 void initialize();
00497 void stop();
00498 void* joinFinal();
00499
00500 public:
00501
00502 CCAFEThreadPoolThread(CCAFEThreadPool* newPool)
00503 { pool = newPool; shutdown = FALSE; };
00506 ~CCAFEThreadPoolThread();
00507
00510 void* run();
00511
00513 int start();
00514
00517 void* join();
00518 };
00519
00521 class CCAFEThreadPool {
00522 private:
00523 jcpp::Vector vec1;
00524 jcpp::Vector vec2;
00525 CCAFEThreadSafeQueue availableQueue;
00526 CCAFEThreadSafeQueue runningQueue;
00527 CCAFEMutex mutex;
00528
00529 CCAFECondition cond;
00530 int maxSize;
00531
00532 protected:
00533 friend class CCAFEThreadPoolThread;
00534
00535
00536 void queueThread(CCAFEThread* thread);
00537
00538 void removeThread(CCAFEThread* thread);
00539
00540 public:
00541 CCAFEThreadPool(int size) :availableQueue(&vec1), runningQueue(&vec2)
00542 {
00543 maxSize = size;
00544 };
00545
00546 CCAFEThread* dequeueThread();
00547
00548 void setSize(int size);
00549
00550 void shutdown();
00551
00552 };
00553
00554 #endif // thread.h seen
00555 #else
00556 extern int ccafe_no_jcpp_util_Thread;
00557 #endif // CCAFE_THREADS