PFUNC  1.0
pfunc/cond.hpp
Go to the documentation of this file.
00001 #ifndef PFUNC_COND_HPP
00002 #define PFUNC_COND_HPP
00003 
00084 #include <pfunc/config.h>
00085 
00090 #if PFUNC_HAVE_FUTEX != 1
00091 
00092 #include <pfunc/exception.hpp>
00093 #include <pfunc/mutex.hpp>
00094 
00095 #if PFUNC_WINDOWS == 1 
00096 #include <Windows.h>
00097 
00098 namespace pfunc { namespace detail {
00099   struct cond : public detail::no_copy {
00100     private:
00101     volatile int num_waiters; 
00102     int MAX_COUNT; 
00103     CRITICAL_SECTION num_waiters_lock; 
00104     HANDLE thread_queue; 
00105     HANDLE all_awake_event; 
00106     BOOL was_broadcast; 
00108     public:
00112     cond () {
00113       num_waiters = 0;
00114       MAX_COUNT = 1024;
00115       was_broadcast = FALSE;
00116       thread_queue = CreateSemaphore (NULL, /* No Security */
00117                                       0, /* Initial count */
00118                                       MAX_COUNT, /* Maximum count */
00119                                       NULL); /* Anonymous thread_queue */
00120       if (NULL == thread_queue)
00121 #if PFUNC_USE_EXCEPTIONS == 1
00122         throw exception_generic_impl 
00123             ("pfunc::detail::cond::cond::CreateSemaphore at " FILE_AND_LINE(),
00124              "Error in initializing the thread_queue",
00125            GetLastError());
00126 #else
00127         return;
00128 #endif
00129 
00130       InitializeCriticalSection (&num_waiters_lock);
00131 
00132       all_awake_event = CreateEvent (NULL, /* No security */
00133                                   FALSE,/* auto-reset event */
00134                                   FALSE,/* non-signaled initially */
00135                                   NULL);/* No name */
00136 
00137       if (NULL == all_awake_event) 
00138 #if PFUNC_USE_EXCEPTIONS == 1
00139         throw exception_generic_impl 
00140             ("pfunc::detail::cond::cond::CreateEvent at " FILE_AND_LINE(),
00141              "Error in initializing the event",
00142            GetLastError());
00143 #else
00144         return;
00145 #endif
00146     }
00147 
00151     ~cond () {
00152       num_waiters = 0;
00153       DeleteCriticalSection (&num_waiters_lock);
00154       if (TRUE != CloseHandle (thread_queue) || TRUE != CloseHandle (all_awake_event)) 
00155 #if PFUNC_USE_EXCEPTIONS == 1
00156         throw exception_generic_impl 
00157             ("pfunc::detail::cond::~cond::CloseHandle at " FILE_AND_LINE(),
00158              "Error in destroying the condition variable",
00159              GetLastError());
00160 #else
00161         return;
00162 #endif
00163     }
00164 
00170     void wait (mutex& mtx) {
00171       HANDLE actual_mutex = mtx.get_internal_mutex();
00172       BOOL last_waiter = FALSE;
00173   
00174       /* Increment num_waiters */
00175       EnterCriticalSection (&num_waiters_lock);
00176       ++num_waiters;
00177       LeaveCriticalSection (&num_waiters_lock);
00178   
00179       /* Atomically release the mutex and wait on the thread_queue until signal/broadcast */
00180       error_code_type error = SignalObjectAndWait (actual_mutex, /* Mutex to release */
00181                                                 thread_queue, /* Sempahore to wait on */
00182                                                 INFINITE, /* Amount of time to wait */
00183                                                 FALSE);  /* Is not alertable */
00184       if (WAIT_OBJECT_0 != error)
00185 #if PFUNC_USE_EXCEPTIONS == 1
00186         throw exception_generic_impl 
00187    ("pfunc::detail::cond::wait::SignalObjectAndWait at " FILE_AND_LINE(),
00188     "Error on atomically releasing the mutex and waiting on the thread_queue",
00189     GetLastError());
00190 #else
00191          return;
00192 #endif
00193   
00194       /* Post-processing */
00195       EnterCriticalSection (&num_waiters_lock);
00196       --num_waiters;
00197       last_waiter = (was_broadcast && num_waiters==0);
00198       LeaveCriticalSection (&num_waiters_lock);
00199   
00200       /* Check if we are the last one out the door in a broadcast. If so, we should let
00201          other threads proceed */
00202       if (last_waiter) { 
00203         error =  SignalObjectAndWait (all_awake_event, 
00204                                       actual_mutex, 
00205                                       INFINITE, 
00206                                       FALSE);
00207       if (WAIT_OBJECT_0 != error)
00208 #if PFUNC_USE_EXCEPTIONS == 1
00209         throw exception_generic_impl 
00210     ("pfunc::detail::cond::wait::SignalObjectAndWait at " FILE_AND_LINE(),
00211      "Error while checking we are the last thread out of the broadcast",
00212      GetLastError());
00213 #else
00214         return;
00215 #endif
00216       } else {
00217         error = WaitForSingleObject (actual_mutex, INFINITE);
00218         if (WAIT_OBJECT_0 != error)
00219 #if PFUNC_USE_EXCEPTIONS == 1
00220           throw exception_generic_impl 
00221       ("pfunc::detail::cond::wait::WaitForSingleObject at " FILE_AND_LINE(),
00222        "Error while waiting for the mutex",
00223        GetLastError());
00224 #else
00225           return;
00226 #endif
00227       }
00228     }
00229 
00233     void signal () {
00234       BOOL waiters_present = FALSE;
00235   
00236       EnterCriticalSection (&num_waiters_lock);
00237       waiters_present = num_waiters>0;
00238       LeaveCriticalSection (&num_waiters_lock);
00239   
00240       /* If there are waiters, then release one of them */
00241       if (waiters_present) {
00242         if (FALSE == ReleaseSemaphore (thread_queue, 1, 0)) 
00243 #if PFUNC_USE_EXCEPTIONS == 1
00244           throw exception_generic_impl 
00245             ("pfunc::detail::cond::signal::ReleaseSemaphore at " FILE_AND_LINE(),
00246              "Error releasing the thread_queue",
00247              GetLastError());
00248 #else
00249           return;
00250 #endif
00251       }
00252     }
00253 
00257     void broadcast () {
00258       BOOL waiters_present = FALSE;
00259   
00260       /*This is needed to ensure that num_waiters and was_broadcast are
00261        * consistent */
00262       EnterCriticalSection (&num_waiters_lock);
00263       if (num_waiters>0) {
00264         was_broadcast = 1;
00265         waiters_present = TRUE;
00266       }
00267   
00268       if (waiters_present) {
00269         /* Wake up everyone */
00270         if (FALSE == ReleaseSemaphore (thread_queue, num_waiters, 0))
00271 #if PFUNC_USE_EXCEPTIONS
00272             throw exception_generic_impl 
00273       ("pfunc::detail::cond::broadcast::ReleaseSemaphore at " FILE_AND_LINE(),
00274        "Error releasing the thread_queue",
00275        GetLastError());
00276 #else
00277             return;
00278 #endif
00279  
00280         LeaveCriticalSection (&num_waiters_lock);
00281  
00282         /* Wait for all awakened threads to acquire the counting thread_queue */
00283         if (WAIT_OBJECT_0 != WaitForSingleObject (all_awake_event, INFINITE)) 
00284 #if PFUNC_USE_EXCEPTIONS == 1
00285             throw exception_generic_impl 
00286     ("pfunc::detail::cond::broadcast::WaitForSingleObject at" FILE_AND_LINE(),
00287       "Error waiting for the broadcast to go through",
00288       GetLastError());
00289 #else
00290             return;
00291 #endif
00292 
00296         was_broadcast = FALSE;
00297       } else {
00298         LeaveCriticalSection (&num_waiters_lock);
00299       }
00300     }
00301   };
00302 } /* namespace detail */ } /* namespace pfunc */
00303 
00304 #elif PFUNC_HAVE_PTHREADS == 1
00305 #include <pthread.h>
00306 
00307 namespace pfunc { namespace detail {
00308 
00309   struct cond : public detail::no_copy {
00310     private:
00311     pthread_cond_t cnd; 
00313     public:
00317     cond () {
00318       PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_init (&cnd, NULL);
00319 #if PFUNC_USE_EXCEPTIONS == 1
00320       if (error) 
00321         throw exception_generic_impl 
00322       ("pfunc::detail::cond::cond::pthread_cond_init at " FILE_AND_LINE(),
00323        "Error in initializing the condition variable",
00324        error);
00325 #endif
00326     }
00327 
00331     ~cond () {
00332       PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_destroy (&cnd);
00333 #if PFUNC_USE_EXCEPTIONS == 1
00334       if (error) 
00335         throw exception_generic_impl
00336             ("pfunc::detail::cond::~cond::pthread_cond_destroy at " FILE_AND_LINE(),
00337              "Error in destroying the condition variable",
00338              error);
00339 #endif
00340     }
00341 
00347     void wait (mutex& mtx) {
00348       pthread_mutex_t& actual_mutex = mtx.get_internal_mutex();
00349       PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_wait (&cnd, &actual_mutex);
00350 #if PFUNC_USE_EXCEPTIONS == 1
00351       if (error) 
00352         throw exception_generic_impl
00353             ("pfunc::detail::cond::wait::pthread_cond_wait at " FILE_AND_LINE(),
00354              "Error in waiting for the condition variable",
00355              error);
00356 #endif
00357     }
00358 
00362     void signal () {
00363       PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_signal (&cnd);
00364 #if PFUNC_USE_EXCEPTIONS == 1
00365       if (error)
00366         throw exception_generic_impl 
00367   ("pfunc::detail::cond::signal::pthread_cond_signal at " FILE_AND_LINE(),
00368    "Error in signaling the condition variable",
00369     error);
00370 #endif
00371     }
00372 
00376     void broadcast () {
00377       PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_broadcast (&cnd);
00378 #if PFUNC_USE_EXCEPTIONS == 1
00379       if (error)
00380         throw exception_generic_impl 
00381    ("pfunc::detail::cond::broadcast::pthread_cond_signal at " FILE_AND_LINE(),
00382     "Error in broadcasting the condition variable",
00383     error);
00384 #endif
00385     }
00386   };
00387 } /* namespace detail */ } /* namespace pfunc */
00388 
00389 #else
00390 #error "Platform not supported"
00391 #endif
00392 
00393 #endif /* if PFUNC_HAVE_FUTEX == 1 */
00394 
00395 #endif // PFUNC_COND_HPP