Go to the documentation of this file.00001
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045
00046
00047 #include <pthread.h>
00048 #include <stdint.h>
00049 #include <iostream>
00050 #include <vector>
00051 #include <deque>
00052 #include <time.h>
00053 #include "comptime.hpp"
00054
00055
00056
00057
00058
00059
00060
00061
00086 template <class Solv, class Prob, class Err>
00087 class Scheduler {
00088
00089 class Consumer {
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100 pthread_t _thread;
00101 Solv *_solver;
00102 Scheduler *_scheduler;
00103
00104
00105
00106 void *consumer_main( void ) {
00107
00108
00109 #ifdef SCHEDULER_DEBUG
00110 std::cout << "Consumer main entrance\n";
00111 #endif
00112
00113
00114
00115
00116 Prob *p;
00117 uint32_t pi;
00118 while( (p = _scheduler->get_next_problem( pi )) ) {
00119 try {
00120
00121
00122 (*_solver)( p, pi );
00123
00124
00125 } catch( Err e ) {
00126
00127
00128 _scheduler->on_error( e, pi );
00129 break;
00130 };
00131 _scheduler->inc_solved_problem();
00132 }
00133
00134 #ifdef SCHEDULER_DEBUG
00135 std::cout << "Exiting consumer\n";
00136 #endif
00137
00138
00139
00140 return( NULL );
00141 }
00142
00143 public:
00144
00145 static void *consumer_entry( void *data ) {
00146 Consumer *consumer = (Consumer *)data;
00147 return( consumer->consumer_main() );
00148 }
00149
00150 Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) {
00151
00152
00153 #ifdef SCHEDULER_DEBUG
00154 std::cout << "Consumer constructor\n";
00155 #endif
00156
00157 }
00158
00159 ~Consumer() {
00160
00161 #ifdef SCHEDULER_DEBUG
00162 std::cout << "Consumer destructor\n";
00163 #endif
00164
00165
00166
00167
00168
00169
00170
00171
00172 }
00173
00174 void run( void ) {
00175 pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00176 }
00177
00178 void join( void ) {
00179
00180 #ifdef SCHEDULER_DEBUG
00181 std::cout << "Consumer join\n";
00182 #endif
00183
00184
00185
00186
00187
00188
00189
00190
00191 pthread_join( _thread, NULL );
00192 }
00193
00194 };
00195
00196
00197 pthread_mutex_t _mutex;
00198 pthread_cond_t _scheduler_cond;
00199 pthread_cond_t _producer_cond;
00200 pthread_cond_t _consumer_cond;
00201
00202
00203
00204
00205
00206 uint32_t _read_c;
00207 uint32_t _solved_c;
00208 std::vector<Prob *> &_problems;
00209
00210 pthread_t _scheduler_thread;
00211 std::vector<Consumer *> _consumers;
00212
00213 bool _join;
00214 bool _running;
00215 bool _error;
00216 bool _done;
00217 bool _finish;
00218 std::vector<Err> _err;
00219 std::vector<int32_t> _eprob;
00220
00221
00228 void on_error( Err &e, uint32_t pi ) {
00229 pthread_mutex_lock( &_mutex );
00230 _err.push_back( e );
00231 _eprob.push_back( pi );
00232 _error = true;
00233 pthread_cond_broadcast( &_scheduler_cond );
00234 pthread_mutex_unlock( &_mutex );
00235 }
00236
00237
00240 void inc_solved_problem( void ) {
00241 pthread_mutex_lock( &_mutex );
00242 _solved_c++;
00243 pthread_mutex_unlock( &_mutex );
00244 }
00245
00252 Prob *get_next_problem( uint32_t &pi ) {
00253 #ifdef SCHEDULER_DEBUG
00254 std::cout << "get_next_problem()\n";
00255 #endif
00256 pthread_mutex_lock( &_mutex );
00257
00258 if( _done || _error ) {
00259 pthread_mutex_unlock( &_mutex );
00260 #ifdef SCHEDULER_DEBUG
00261 std::cout << "get_next_problem(): Returning NULL\n";
00262 #endif
00263 pi = -1;
00264 return( NULL );
00265 }
00266
00267 if( _problems.size() == _read_c ) {
00268 #ifdef SCHEDULER_DEBUG
00269 std::cout << "get_next_problem(): No problem to return... waiting\n";
00270 #endif
00271
00272 pthread_cond_signal( &_scheduler_cond );
00273 while( _problems.size() == _read_c ) {
00274
00275 pthread_cond_wait( &_consumer_cond, &_mutex );
00276 if( _done || _error ) {
00277 pthread_mutex_unlock( &_mutex );
00278 #ifdef SCHEDULER_DEBUG
00279 std::cout << "get_next_problem(): Returning NULL\n";
00280 #endif
00281 pi = -1;
00282 return( NULL );
00283 }
00284 }
00285 }
00286
00287
00288 pi = _read_c++;
00289 Prob *ret = _problems[pi];
00290
00291 #ifdef SCHEDULER_DEBUG
00292 std::cout << "get_next_problem(): Returning problem " << pi << "\n";
00293 #endif
00294
00295 pthread_mutex_unlock( &_mutex );
00296 return( ret );
00297 }
00298
00299
00302 void *scheduler_main( void ) {
00303
00304 #ifdef SCHEDULER_DEBUG
00305 std::cout << "Running scheduler_main()\n";
00306 #endif
00307
00308
00309 for( size_t a = 0; a < _consumers.size(); a++ )
00310 _consumers[a]->run();
00311
00312 pthread_mutex_lock( &_mutex );
00313
00314 while( 1 ) {
00315
00316 while( !(_problems.size() == _solved_c || _done || _error) ) {
00317
00318 pthread_cond_wait( &_scheduler_cond, &_mutex );
00319 }
00320
00321 if( (_finish && _problems.size() == _solved_c) || _done || _error )
00322 break;
00323
00324
00325 pthread_cond_wait( &_scheduler_cond, &_mutex );
00326
00327
00328
00329
00330
00331 pthread_cond_broadcast( &_consumer_cond );
00332 }
00333
00334
00335 _done = true;
00336 _running = false;
00337 pthread_cond_broadcast( &_consumer_cond );
00338 pthread_mutex_unlock( &_mutex );
00339
00340
00341
00342 for( size_t a = 0; a < _consumers.size(); a++ )
00343 _consumers[a]->join();
00344
00345 pthread_cond_broadcast( &_producer_cond );
00346
00347 return( NULL );
00348 }
00349
00350
00353 static void *scheduler_entry( void *data ) {
00354 Scheduler *scheduler = (Scheduler *)data;
00355 return( scheduler->scheduler_main() );
00356 }
00357
00358
00359 public:
00360
00361
00366 Scheduler( std::vector<Prob *> &prob )
00367 : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false),
00368 _error(false), _done(false), _finish(false) {
00369
00370
00371 pthread_mutex_init( &_mutex, NULL );
00372 pthread_cond_init( &_scheduler_cond, NULL );
00373 pthread_cond_init( &_consumer_cond, NULL );
00374 pthread_cond_init( &_producer_cond, NULL );
00375 }
00376
00377
00380 ~Scheduler() {
00381
00382
00383 _done = true;
00384 finish();
00385
00386 pthread_mutex_destroy( &_mutex );
00387 pthread_cond_destroy( &_scheduler_cond );
00388 pthread_cond_destroy( &_consumer_cond );
00389 pthread_cond_destroy( &_producer_cond );
00390 }
00391
00392
00395 bool is_error( void ) {
00396
00397 return( _error );
00398 }
00399
00400
00403 bool is_running( void ) {
00404
00405 return( _running );
00406 }
00407
00408
00411 uint32_t get_solved_count( void ) {
00412 pthread_mutex_lock( &_mutex );
00413 uint32_t ret = _solved_c;
00414 pthread_mutex_unlock( &_mutex );
00415 return( ret );
00416 }
00417
00418
00421 uint32_t get_problem_count( void ) {
00422 pthread_mutex_lock( &_mutex );
00423 uint32_t ret = _problems.size();
00424 pthread_mutex_unlock( &_mutex );
00425 return( ret );
00426 }
00427
00428
00437 template <class Cont1, class Cont2>
00438 size_t get_errors( Cont1 &e, Cont2 &pi ) {
00439 pthread_mutex_lock( &_mutex );
00440 size_t r = _err.size();
00441 for( size_t a = 0; a < _err.size(); a++ ) {
00442 e.push_back( _err[a] );
00443 pi.push_back( _eprob[a] );
00444 }
00445 _err.clear();
00446 _eprob.clear();
00447 pthread_mutex_unlock( &_mutex );
00448 return( r );
00449 }
00450
00451
00458 void run( std::vector<Solv *> solv ) {
00459
00460
00461 if( _running )
00462 return;
00463
00464
00465 for( size_t a = 0; a < solv.size(); a++ )
00466 _consumers.push_back( new Consumer( solv[a], this ) );
00467
00468 _read_c = 0;
00469 _solved_c = 0;
00470 _join = true;
00471 _running = true;
00472 _error = false;
00473 _done = false;
00474 _finish = false;
00475 _err.clear();
00476 _eprob.clear();
00477
00478
00479 pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00480 }
00481
00482
00485 void lock_mutex( void ) {
00486
00487 pthread_mutex_lock( &_mutex );
00488 }
00489
00490
00493 void unlock_mutex( void ) {
00494
00495 pthread_cond_broadcast( &_scheduler_cond );
00496 pthread_mutex_unlock( &_mutex );
00497 }
00498
00499
00507 bool force_exit( void ) {
00508
00509 _done = true;
00510 return( finish() );
00511 }
00512
00519 bool wait_finish( void ) {
00520
00521 pthread_mutex_lock( &_mutex );
00522 if( _running ) {
00523 _finish = true;
00524 pthread_cond_broadcast( &_scheduler_cond );
00525
00526 struct timespec ts;
00527 ibs_clock_gettime( CLOCK_REALTIME, &ts );
00528 ts.tv_sec += 1;
00529 int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts );
00530 if( rc == ETIMEDOUT ) {
00531 pthread_mutex_unlock( &_mutex );
00532 return( false );
00533 }
00534 }
00535 pthread_mutex_unlock( &_mutex );
00536 return( true );
00537 }
00538
00547 bool finish( void ) {
00548
00549 pthread_mutex_lock( &_mutex );
00550 if( _running ) {
00551 _finish = true;
00552
00553 pthread_cond_broadcast( &_scheduler_cond );
00554
00555
00556 pthread_cond_wait( &_producer_cond, &_mutex );
00557 }
00558 pthread_mutex_unlock( &_mutex );
00559
00560 if( _join ) {
00561
00562 for( size_t a = 0; a < _consumers.size(); a++ )
00563 delete _consumers[a];
00564 _consumers.clear();
00565
00566 pthread_join( _scheduler_thread, NULL );
00567 _join = false;
00568 }
00569
00570 if( _error )
00571 return( false );
00572
00573 return( true );
00574 }
00575
00576 friend class Consumer;
00577 };
00578
00579
00580
00581 #endif
00582