scheduler.hpp
Go to the documentation of this file.
00001 
00005 /* Copyright (c) 2005-2009,2011 Taneli Kalvas, Jan Sarén. All rights reserved.
00006  *
00007  * You can redistribute this software and/or modify it under the terms
00008  * of the GNU General Public License as published by the Free Software
00009  * Foundation; either version 2 of the License, or (at your option)
00010  * any later version.
00011  * 
00012  * This library is distributed in the hope that it will be useful, but
00013  * WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
00015  * General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with this library (file "COPYING" included in the package);
00019  * if not, write to the Free Software Foundation, Inc., 51 Franklin
00020  * Street, Fifth Floor, Boston, MA 02110-1301 USA
00021  * 
00022  * If you have questions about your rights to use or distribute this
00023  * software, please contact Berkeley Lab's Technology Transfer
00024  * Department at TTD@lbl.gov. Other questions, comments and bug
00025  * reports should be sent directly to the author via email at
00026  * taneli.kalvas@jyu.fi.
00027  * 
00028  * NOTICE. This software was developed under partial funding from the
00029  * U.S.  Department of Energy.  As such, the U.S. Government has been
00030  * granted for itself and others acting on its behalf a paid-up,
00031  * nonexclusive, irrevocable, worldwide license in the Software to
00032  * reproduce, prepare derivative works, and perform publicly and
00033  * display publicly.  Beginning five (5) years after the date
00034  * permission to assert copyright is obtained from the U.S. Department
00035  * of Energy, and subject to any subsequent five (5) year renewals,
00036  * the U.S. Government is granted for itself and others acting on its
00037  * behalf a paid-up, nonexclusive, irrevocable, worldwide license in
00038  * the Software to reproduce, prepare derivative works, distribute
00039  * copies to the public, perform publicly and display publicly, and to
00040  * permit others to do so.
00041  */
00042 
00043 #ifndef SCHEDULER_HPP
00044 #define SCHEDULER_HPP 1
00045 
00046 
00047 #include <pthread.h>
00048 #include <stdint.h>
00049 #include <iostream>
00050 #include <vector>
00051 #include <deque>
00052 #include <time.h>
00053 #include "comptime.hpp"
00054 
00055 
00056 //#define SCHEDULER_DEBUG 1
00057 
00058 
00059 //pthread_mutex_t cout_mutex = PTHREAD_MUTEX_INITIALIZER;
00060 
00061 
00086 template <class Solv, class Prob, class Err>
00087 class Scheduler {
00088 
00089     class Consumer {
00090 
00091         /*
00092         enum consumer_status_e {
00093             CONSUMER_CREATED = 0,
00094             CONSUMER_RUNNING,
00095             CONSUMER_FINISHED
00096         };
00097         */
00098 
00099         //pthread_mutex_t      _mutex;             //!< \brief Mutex for active check
00100         pthread_t            _thread;            
00101         Solv                *_solver;            
00102         Scheduler           *_scheduler;         
00103         //struct timeval       _t0;
00104         //std::vector<struct timeval> _t;
00105     
00106         void *consumer_main( void ) {
00107             //struct timeval t;
00108             
00109 #ifdef SCHEDULER_DEBUG
00110             std::cout << "Consumer main entrance\n";
00111 #endif
00112             //pthread_mutex_lock( &_mutex );
00113             //_status = CONSUMER_RUNNING;
00114             //pthread_mutex_unlock( &_mutex );
00115 
00116             Prob *p;
00117             uint32_t pi;
00118             while( (p = _scheduler->get_next_problem( pi )) ) {
00119                 try {
00120                     //gettimeofday( &t, NULL );
00121                     //_t.push_back( t );
00122                     (*_solver)( p, pi );
00123                     //gettimeofday( &t, NULL );
00124                     //_t.push_back( t );
00125                 } catch( Err e ) {
00126                     //std::cout << "on_error\n";
00127                     // Handle error and stop solving
00128                     _scheduler->on_error( e, pi );
00129                     break;
00130                 };
00131                 _scheduler->inc_solved_problem();
00132             }
00133 
00134 #ifdef SCHEDULER_DEBUG
00135             std::cout << "Exiting consumer\n";
00136 #endif
00137             //pthread_mutex_lock( &_mutex );
00138             //_status = CONSUMER_FINISHED;
00139             //pthread_mutex_unlock( &_mutex );
00140             return( NULL );
00141         }
00142     
00143     public:
00144 
00145         static void *consumer_entry( void *data ) {
00146             Consumer *consumer = (Consumer *)data;
00147             return( consumer->consumer_main() );
00148         }
00149 
00150         Consumer( Solv *solver, Scheduler *scheduler ) : _solver(solver), _scheduler(scheduler) { 
00151 
00152             //pthread_mutex_init( &_mutex, NULL );
00153 #ifdef SCHEDULER_DEBUG
00154             std::cout << "Consumer constructor\n";
00155 #endif
00156             //gettimeofday( &_t0, NULL );
00157         }
00158 
00159         ~Consumer() {
00160             //pthread_mutex_lock( &cout_mutex );
00161 #ifdef SCHEDULER_DEBUG
00162             std::cout << "Consumer destructor\n";
00163 #endif
00164             //for( size_t a = 0; a < _t.size(); a++ ) {
00165             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00166             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n";
00167             //a++;
00168             //std::cout << (_t[a].tv_sec-_t0.tv_sec) + 
00169             //(_t[a].tv_usec-_t0.tv_usec)/1e6 << "\n\n\n";
00170             //}
00171             //pthread_mutex_unlock( &cout_mutex );
00172         }
00173 
00174         void run( void ) {
00175             pthread_create( &_thread, NULL, consumer_entry, (void *)this );
00176         }
00177 
00178         void join( void ) {
00179 
00180 #ifdef SCHEDULER_DEBUG
00181             std::cout << "Consumer join\n";
00182 #endif
00183             //pthread_mutex_lock( &_mutex );
00184             //if( _status == CONSUMER_FINISHED ) {
00185             //pthread_mutex_unlock( &_mutex );
00186             //return;
00187             //} else if( _status == CONSUMER_CREATED ) {
00188             //
00189             //}
00190             //pthread_mutex_unlock( &_mutex );
00191             pthread_join( _thread, NULL );
00192         }
00193 
00194     };
00195 
00196 
00197     pthread_mutex_t         _mutex;            
00198     pthread_cond_t          _scheduler_cond;   
00199     pthread_cond_t          _producer_cond;    
00200     pthread_cond_t          _consumer_cond;    
00201 
00202     //size_t                  _problems_in_c;    //!< \brief Total problems in count
00203     //size_t                  _problems_err_c;   //!< \brief Total error problems out count
00204     //std::deque<Prob*>       _problems_out;     //!< \brief Problems already solved
00205 
00206     uint32_t                _read_c;           
00207     uint32_t                _solved_c;         
00208     std::vector<Prob *>    &_problems;         
00209 
00210     pthread_t               _scheduler_thread; 
00211     std::vector<Consumer *> _consumers;        
00212 
00213     bool                    _join;             
00214     bool                    _running;          
00215     bool                    _error;            
00216     bool                    _done;             
00217     bool                    _finish;           
00218     std::vector<Err>        _err;              
00219     std::vector<int32_t>    _eprob;            
00220 
00221 
00228     void on_error( Err &e, uint32_t pi ) {
00229         pthread_mutex_lock( &_mutex );
00230         _err.push_back( e );
00231         _eprob.push_back( pi );
00232         _error = true;
00233         pthread_cond_broadcast( &_scheduler_cond );
00234         pthread_mutex_unlock( &_mutex );
00235     }
00236 
00237 
00240     void inc_solved_problem( void ) {
00241         pthread_mutex_lock( &_mutex );
00242         _solved_c++;
00243         pthread_mutex_unlock( &_mutex );
00244     }
00245 
00252     Prob *get_next_problem( uint32_t &pi ) {
00253 #ifdef SCHEDULER_DEBUG
00254             std::cout << "get_next_problem()\n";
00255 #endif
00256         pthread_mutex_lock( &_mutex );
00257     
00258         if( _done || _error ) {
00259             pthread_mutex_unlock( &_mutex );
00260 #ifdef SCHEDULER_DEBUG
00261             std::cout << "get_next_problem(): Returning NULL\n";
00262 #endif
00263             pi = -1;
00264             return( NULL );
00265         }
00266 
00267         if( _problems.size() == _read_c ) {
00268 #ifdef SCHEDULER_DEBUG
00269             std::cout << "get_next_problem(): No problem to return... waiting\n";
00270 #endif
00271             // Signal producer that problems are spent
00272             pthread_cond_signal( &_scheduler_cond );
00273             while( _problems.size() == _read_c ) {
00274                 // Wait for new problems
00275                 pthread_cond_wait( &_consumer_cond, &_mutex );
00276                 if( _done || _error ) {
00277                     pthread_mutex_unlock( &_mutex );
00278 #ifdef SCHEDULER_DEBUG
00279                     std::cout << "get_next_problem(): Returning NULL\n";
00280 #endif
00281                     pi = -1;
00282                     return( NULL );
00283                 }
00284             }
00285         }
00286 
00287         // Return next problem
00288         pi = _read_c++;
00289         Prob *ret = _problems[pi];
00290 
00291 #ifdef SCHEDULER_DEBUG
00292         std::cout << "get_next_problem(): Returning problem " << pi << "\n";
00293 #endif
00294 
00295         pthread_mutex_unlock( &_mutex );
00296         return( ret );
00297     }
00298 
00299     
00302     void *scheduler_main( void ) {
00303 
00304 #ifdef SCHEDULER_DEBUG
00305         std::cout << "Running scheduler_main()\n";
00306 #endif
00307 
00308         // Start consumer threads
00309         for( size_t a = 0; a < _consumers.size(); a++ )
00310             _consumers[a]->run();
00311 
00312         pthread_mutex_lock( &_mutex );
00313 
00314         while( 1 ) {
00315             // Wait until all consumers are done with all problems or error occurs
00316             while( !(_problems.size() == _solved_c || _done || _error) ) {
00317                 //std::cout << "scheduler_main(): scheduler_cond wait 1\n";
00318                 pthread_cond_wait( &_scheduler_cond, &_mutex );
00319             }
00320 
00321             if( (_finish && _problems.size() == _solved_c) || _done || _error )
00322                 break;
00323 
00324             // Problems temporarily done
00325             pthread_cond_wait( &_scheduler_cond, &_mutex );
00326             //std::cout << "scheduler_main(): prob_in = " << _problems_in_c
00327             //<< " prob_out = " << _problems_out_c << "\n";
00328             //std::cout << "scheduler_main(): scheduler_cond wait 2\n";
00329 
00330             // Signal consumers to wake up
00331             pthread_cond_broadcast( &_consumer_cond );
00332         }
00333 
00334         // Broadcast: done
00335         _done = true;
00336         _running = false;
00337         pthread_cond_broadcast( &_consumer_cond );
00338         pthread_mutex_unlock( &_mutex );
00339 
00340         // Join all consumers
00341         //std::cout << "scheduler_main(): Scheduler waiting in join\n";
00342         for( size_t a = 0; a < _consumers.size(); a++ )
00343             _consumers[a]->join();
00344 
00345         pthread_cond_broadcast( &_producer_cond );
00346         //std::cout << "scheduler_main(): Exiting scheduler\n";
00347         return( NULL );
00348     }
00349 
00350 
00353     static void *scheduler_entry( void *data ) {
00354         Scheduler *scheduler = (Scheduler *)data;
00355         return( scheduler->scheduler_main() );
00356     }
00357 
00358 
00359 public:
00360 
00361 
00366     Scheduler( std::vector<Prob *> &prob ) 
00367         : _read_c(0), _solved_c(0), _problems(prob), _join(false), _running(false), 
00368           _error(false), _done(false), _finish(false) {
00369 
00370         // Initialize pthread objects
00371         pthread_mutex_init( &_mutex, NULL );
00372         pthread_cond_init( &_scheduler_cond, NULL );
00373         pthread_cond_init( &_consumer_cond, NULL );
00374         pthread_cond_init( &_producer_cond, NULL );
00375     }
00376 
00377 
00380     ~Scheduler() {
00381 
00382         // Force exit
00383         _done = true;
00384         finish();
00385 
00386         pthread_mutex_destroy( &_mutex );
00387         pthread_cond_destroy( &_scheduler_cond );
00388         pthread_cond_destroy( &_consumer_cond );
00389         pthread_cond_destroy( &_producer_cond );
00390     }
00391 
00392 
00395     bool is_error( void ) {
00396         // No mutex needed for one bit read
00397         return( _error );
00398     }
00399 
00400 
00403     bool is_running( void ) {
00404         // No mutex needed for one bit read
00405         return( _running );
00406     }
00407 
00408 
00411     uint32_t get_solved_count( void ) {
00412         pthread_mutex_lock( &_mutex );
00413         uint32_t ret = _solved_c;
00414         pthread_mutex_unlock( &_mutex );
00415         return( ret );
00416     }
00417 
00418 
00421     uint32_t get_problem_count( void ) {
00422         pthread_mutex_lock( &_mutex );
00423         uint32_t ret = _problems.size();
00424         pthread_mutex_unlock( &_mutex );
00425         return( ret );
00426     }
00427 
00428 
00437     template <class Cont1, class Cont2>
00438     size_t get_errors( Cont1 &e, Cont2 &pi ) {
00439         pthread_mutex_lock( &_mutex );
00440         size_t r = _err.size();
00441         for( size_t a = 0; a < _err.size(); a++ ) {
00442             e.push_back( _err[a] );
00443             pi.push_back( _eprob[a] );
00444         }
00445         _err.clear();
00446         _eprob.clear();
00447         pthread_mutex_unlock( &_mutex );
00448         return( r );
00449     }    
00450 
00451 
00458     void run( std::vector<Solv *> solv ) {
00459 
00460         // Do nothing if already running
00461         if( _running )
00462             return;
00463 
00464         // Create consumer threads
00465         for( size_t a = 0; a < solv.size(); a++ )
00466             _consumers.push_back( new Consumer( solv[a], this ) );
00467 
00468         _read_c = 0;
00469         _solved_c = 0;
00470         _join = true;
00471         _running = true;
00472         _error = false;
00473         _done = false;
00474         _finish = false;
00475         _err.clear();
00476         _eprob.clear();
00477 
00478         // Create scheduler thread
00479         pthread_create( &_scheduler_thread, NULL, scheduler_entry, (void *)this );
00480     }
00481 
00482 
00485     void lock_mutex( void ) {
00486 
00487         pthread_mutex_lock( &_mutex );
00488     }
00489 
00490     
00493     void unlock_mutex( void ) {
00494 
00495         pthread_cond_broadcast( &_scheduler_cond );     
00496         pthread_mutex_unlock( &_mutex );
00497     }
00498     
00499 
00507     bool force_exit( void ) {
00508 
00509         _done = true;
00510         return( finish() );
00511     }
00512 
00519     bool wait_finish( void ) {
00520 
00521         pthread_mutex_lock( &_mutex );
00522         if( _running ) {
00523             _finish = true;
00524             pthread_cond_broadcast( &_scheduler_cond );
00525 
00526             struct timespec ts;
00527             ibs_clock_gettime( CLOCK_REALTIME, &ts );
00528             ts.tv_sec += 1;
00529             int rc = pthread_cond_timedwait( &_producer_cond, &_mutex, &ts );
00530             if( rc == ETIMEDOUT ) {
00531                 pthread_mutex_unlock( &_mutex );
00532                 return( false );
00533             }
00534         }
00535         pthread_mutex_unlock( &_mutex );
00536         return( true );
00537     }
00538 
00547     bool finish( void ) {
00548 
00549         pthread_mutex_lock( &_mutex );
00550         if( _running ) {
00551             _finish = true;
00552             //std::cout << "finish(): scheduler_cond broadcast\n";
00553             pthread_cond_broadcast( &_scheduler_cond );
00554         
00555             //std::cout << "finish(): producer_cond wait\n";
00556             pthread_cond_wait( &_producer_cond, &_mutex );
00557         }
00558         pthread_mutex_unlock( &_mutex );
00559 
00560         if( _join ) {
00561             // Delete consumer threads
00562             for( size_t a = 0; a < _consumers.size(); a++ )
00563                 delete _consumers[a];
00564             _consumers.clear();
00565 
00566             pthread_join( _scheduler_thread, NULL );
00567             _join = false;
00568         }
00569         
00570         if( _error )
00571             return( false );
00572 
00573         return( true );
00574     }
00575 
00576     friend class Consumer;
00577 };
00578 
00579 
00580 
00581 #endif
00582