PFUNC
1.0
|
00001 #ifndef PFUNC_COND_HPP 00002 #define PFUNC_COND_HPP 00003 00084 #include <pfunc/config.h> 00085 00090 #if PFUNC_HAVE_FUTEX != 1 00091 00092 #include <pfunc/exception.hpp> 00093 #include <pfunc/mutex.hpp> 00094 00095 #if PFUNC_WINDOWS == 1 00096 #include <Windows.h> 00097 00098 namespace pfunc { namespace detail { 00099 struct cond : public detail::no_copy { 00100 private: 00101 volatile int num_waiters; 00102 int MAX_COUNT; 00103 CRITICAL_SECTION num_waiters_lock; 00104 HANDLE thread_queue; 00105 HANDLE all_awake_event; 00106 BOOL was_broadcast; 00108 public: 00112 cond () { 00113 num_waiters = 0; 00114 MAX_COUNT = 1024; 00115 was_broadcast = FALSE; 00116 thread_queue = CreateSemaphore (NULL, /* No Security */ 00117 0, /* Initial count */ 00118 MAX_COUNT, /* Maximum count */ 00119 NULL); /* Anonymous thread_queue */ 00120 if (NULL == thread_queue) 00121 #if PFUNC_USE_EXCEPTIONS == 1 00122 throw exception_generic_impl 00123 ("pfunc::detail::cond::cond::CreateSemaphore at " FILE_AND_LINE(), 00124 "Error in initializing the thread_queue", 00125 GetLastError()); 00126 #else 00127 return; 00128 #endif 00129 00130 InitializeCriticalSection (&num_waiters_lock); 00131 00132 all_awake_event = CreateEvent (NULL, /* No security */ 00133 FALSE,/* auto-reset event */ 00134 FALSE,/* non-signaled initially */ 00135 NULL);/* No name */ 00136 00137 if (NULL == all_awake_event) 00138 #if PFUNC_USE_EXCEPTIONS == 1 00139 throw exception_generic_impl 00140 ("pfunc::detail::cond::cond::CreateEvent at " FILE_AND_LINE(), 00141 "Error in initializing the event", 00142 GetLastError()); 00143 #else 00144 return; 00145 #endif 00146 } 00147 00151 ~cond () { 00152 num_waiters = 0; 00153 DeleteCriticalSection (&num_waiters_lock); 00154 if (TRUE != CloseHandle (thread_queue) || TRUE != CloseHandle (all_awake_event)) 00155 #if PFUNC_USE_EXCEPTIONS == 1 00156 throw exception_generic_impl 00157 ("pfunc::detail::cond::~cond::CloseHandle at " FILE_AND_LINE(), 00158 "Error in destroying the condition variable", 00159 GetLastError()); 00160 #else 00161 return; 00162 #endif 00163 } 00164 00170 void wait (mutex& mtx) { 00171 HANDLE actual_mutex = mtx.get_internal_mutex(); 00172 BOOL last_waiter = FALSE; 00173 00174 /* Increment num_waiters */ 00175 EnterCriticalSection (&num_waiters_lock); 00176 ++num_waiters; 00177 LeaveCriticalSection (&num_waiters_lock); 00178 00179 /* Atomically release the mutex and wait on the thread_queue until signal/broadcast */ 00180 error_code_type error = SignalObjectAndWait (actual_mutex, /* Mutex to release */ 00181 thread_queue, /* Sempahore to wait on */ 00182 INFINITE, /* Amount of time to wait */ 00183 FALSE); /* Is not alertable */ 00184 if (WAIT_OBJECT_0 != error) 00185 #if PFUNC_USE_EXCEPTIONS == 1 00186 throw exception_generic_impl 00187 ("pfunc::detail::cond::wait::SignalObjectAndWait at " FILE_AND_LINE(), 00188 "Error on atomically releasing the mutex and waiting on the thread_queue", 00189 GetLastError()); 00190 #else 00191 return; 00192 #endif 00193 00194 /* Post-processing */ 00195 EnterCriticalSection (&num_waiters_lock); 00196 --num_waiters; 00197 last_waiter = (was_broadcast && num_waiters==0); 00198 LeaveCriticalSection (&num_waiters_lock); 00199 00200 /* Check if we are the last one out the door in a broadcast. If so, we should let 00201 other threads proceed */ 00202 if (last_waiter) { 00203 error = SignalObjectAndWait (all_awake_event, 00204 actual_mutex, 00205 INFINITE, 00206 FALSE); 00207 if (WAIT_OBJECT_0 != error) 00208 #if PFUNC_USE_EXCEPTIONS == 1 00209 throw exception_generic_impl 00210 ("pfunc::detail::cond::wait::SignalObjectAndWait at " FILE_AND_LINE(), 00211 "Error while checking we are the last thread out of the broadcast", 00212 GetLastError()); 00213 #else 00214 return; 00215 #endif 00216 } else { 00217 error = WaitForSingleObject (actual_mutex, INFINITE); 00218 if (WAIT_OBJECT_0 != error) 00219 #if PFUNC_USE_EXCEPTIONS == 1 00220 throw exception_generic_impl 00221 ("pfunc::detail::cond::wait::WaitForSingleObject at " FILE_AND_LINE(), 00222 "Error while waiting for the mutex", 00223 GetLastError()); 00224 #else 00225 return; 00226 #endif 00227 } 00228 } 00229 00233 void signal () { 00234 BOOL waiters_present = FALSE; 00235 00236 EnterCriticalSection (&num_waiters_lock); 00237 waiters_present = num_waiters>0; 00238 LeaveCriticalSection (&num_waiters_lock); 00239 00240 /* If there are waiters, then release one of them */ 00241 if (waiters_present) { 00242 if (FALSE == ReleaseSemaphore (thread_queue, 1, 0)) 00243 #if PFUNC_USE_EXCEPTIONS == 1 00244 throw exception_generic_impl 00245 ("pfunc::detail::cond::signal::ReleaseSemaphore at " FILE_AND_LINE(), 00246 "Error releasing the thread_queue", 00247 GetLastError()); 00248 #else 00249 return; 00250 #endif 00251 } 00252 } 00253 00257 void broadcast () { 00258 BOOL waiters_present = FALSE; 00259 00260 /*This is needed to ensure that num_waiters and was_broadcast are 00261 * consistent */ 00262 EnterCriticalSection (&num_waiters_lock); 00263 if (num_waiters>0) { 00264 was_broadcast = 1; 00265 waiters_present = TRUE; 00266 } 00267 00268 if (waiters_present) { 00269 /* Wake up everyone */ 00270 if (FALSE == ReleaseSemaphore (thread_queue, num_waiters, 0)) 00271 #if PFUNC_USE_EXCEPTIONS 00272 throw exception_generic_impl 00273 ("pfunc::detail::cond::broadcast::ReleaseSemaphore at " FILE_AND_LINE(), 00274 "Error releasing the thread_queue", 00275 GetLastError()); 00276 #else 00277 return; 00278 #endif 00279 00280 LeaveCriticalSection (&num_waiters_lock); 00281 00282 /* Wait for all awakened threads to acquire the counting thread_queue */ 00283 if (WAIT_OBJECT_0 != WaitForSingleObject (all_awake_event, INFINITE)) 00284 #if PFUNC_USE_EXCEPTIONS == 1 00285 throw exception_generic_impl 00286 ("pfunc::detail::cond::broadcast::WaitForSingleObject at" FILE_AND_LINE(), 00287 "Error waiting for the broadcast to go through", 00288 GetLastError()); 00289 #else 00290 return; 00291 #endif 00292 00296 was_broadcast = FALSE; 00297 } else { 00298 LeaveCriticalSection (&num_waiters_lock); 00299 } 00300 } 00301 }; 00302 } /* namespace detail */ } /* namespace pfunc */ 00303 00304 #elif PFUNC_HAVE_PTHREADS == 1 00305 #include <pthread.h> 00306 00307 namespace pfunc { namespace detail { 00308 00309 struct cond : public detail::no_copy { 00310 private: 00311 pthread_cond_t cnd; 00313 public: 00317 cond () { 00318 PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_init (&cnd, NULL); 00319 #if PFUNC_USE_EXCEPTIONS == 1 00320 if (error) 00321 throw exception_generic_impl 00322 ("pfunc::detail::cond::cond::pthread_cond_init at " FILE_AND_LINE(), 00323 "Error in initializing the condition variable", 00324 error); 00325 #endif 00326 } 00327 00331 ~cond () { 00332 PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_destroy (&cnd); 00333 #if PFUNC_USE_EXCEPTIONS == 1 00334 if (error) 00335 throw exception_generic_impl 00336 ("pfunc::detail::cond::~cond::pthread_cond_destroy at " FILE_AND_LINE(), 00337 "Error in destroying the condition variable", 00338 error); 00339 #endif 00340 } 00341 00347 void wait (mutex& mtx) { 00348 pthread_mutex_t& actual_mutex = mtx.get_internal_mutex(); 00349 PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_wait (&cnd, &actual_mutex); 00350 #if PFUNC_USE_EXCEPTIONS == 1 00351 if (error) 00352 throw exception_generic_impl 00353 ("pfunc::detail::cond::wait::pthread_cond_wait at " FILE_AND_LINE(), 00354 "Error in waiting for the condition variable", 00355 error); 00356 #endif 00357 } 00358 00362 void signal () { 00363 PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_signal (&cnd); 00364 #if PFUNC_USE_EXCEPTIONS == 1 00365 if (error) 00366 throw exception_generic_impl 00367 ("pfunc::detail::cond::signal::pthread_cond_signal at " FILE_AND_LINE(), 00368 "Error in signaling the condition variable", 00369 error); 00370 #endif 00371 } 00372 00376 void broadcast () { 00377 PFUNC_CAPTURE_RETURN_VALUE(error) pthread_cond_broadcast (&cnd); 00378 #if PFUNC_USE_EXCEPTIONS == 1 00379 if (error) 00380 throw exception_generic_impl 00381 ("pfunc::detail::cond::broadcast::pthread_cond_signal at " FILE_AND_LINE(), 00382 "Error in broadcasting the condition variable", 00383 error); 00384 #endif 00385 } 00386 }; 00387 } /* namespace detail */ } /* namespace pfunc */ 00388 00389 #else 00390 #error "Platform not supported" 00391 #endif 00392 00393 #endif /* if PFUNC_HAVE_FUTEX == 1 */ 00394 00395 #endif // PFUNC_COND_HPP