PFUNC  1.0
pfunc/taskmgr.hpp
Go to the documentation of this file.
00001 #ifndef PFUNC_TASKMGR_HPP
00002 #define PFUNC_TASKMGR_HPP
00003 
00004   
00010 #include <cstdio>
00011 #include <algorithm>
00012 #include <iterator>
00013 #include <limits>
00014 #include <utility>
00015 #include <map>
00016 
00017 #include <pfunc/pfunc_common.h>
00018 #include <pfunc/pfunc_atomics.h>
00019 #include <pfunc/exception.hpp>
00020 #include <pfunc/mutex.hpp>
00021 #include <pfunc/barrier.hpp>
00022 #include <pfunc/thread.hpp>
00023 
00024 #if PFUNC_USE_PAPI == 1
00025 #include <pfunc/perf.hpp>
00026 #endif
00027 
00028 #include <pfunc/event.hpp>
00029 #include <pfunc/group.hpp>
00030 #include <pfunc/attribute.hpp>
00031 #include <pfunc/task.hpp>
00032 #include <pfunc/trampolines.hpp>
00033 #include <pfunc/task_queue_set.hpp>
00034 #include <pfunc/predicate.hpp>
00035 #include <pfunc/environ.hpp>
00036 
00047 namespace pfunc { namespace detail {
00048 
00063 template <typename SchedPolicyName,
00064           typename Task>
00065 struct taskmgr : public taskmgr_virtual_base  {
00066   typedef Task task; 
00067   typedef typename task::functor functor; 
00068   typedef SchedPolicyName sched_policy_name; 
00069   typedef task_queue_set<sched_policy_name, task> queue_type; 
00070   typedef typename task::attribute attribute; 
00071   typedef typename attribute::priority_type priority_type; 
00072   typedef thread::native_thread_id_type native_thread_id_type; 
00074   typedef regular_predicate_pair<sched_policy_name, task> regular_predicate;
00075   typedef waiting_predicate_pair<sched_policy_name, task> waiting_predicate;
00076   typedef group_predicate_pair<sched_policy_name, task> group_predicate;
00077   typedef typename thread::thread_handle_type thread_handle_type;
00078 
00079   /* Default values for attribute and group */
00080   const attribute default_attribute; 
00081   group default_group; 
00083   private: 
00084   const unsigned int num_queues; 
00085   unsigned int num_threads; 
00086   unsigned int* threads_per_queue; 
00087   queue_type* task_queue; 
00088   thread_handle_type* thread_handles; 
00089   thread_attr** thread_data; 
00090   task* task_cache; 
00091   reroute_function_arg** thread_args; 
00092   volatile unsigned int thread_start_count; 
00093   thread_attr* main_thread_attr; 
00094   thread thread_manager; 
00095   barrier start_up_barrier; 
00096 #if PFUNC_USE_PAPI == 1
00097   long long** perf_event_values; 
00098   int num_events; 
00099   int *event_codes; 
00100 #endif
00101 
00104   struct aligned_bool { 
00105     ALIGN128 volatile bool is_cancelled; 
00106     aligned_bool () : is_cancelled (false) {}
00107     void cancel () { is_cancelled = true; }
00108     bool operator()() const { return is_cancelled; }
00109   };
00110   aligned_bool* thread_state; 
00111   unsigned int task_max_attempts; 
00112   PFUNC_DEFINE_EXCEPT_PTR() 
00117   struct task_completion_predicate {
00118     event<testable_event>& compl_event; 
00123     task_completion_predicate (event<testable_event>& compl_event) :
00124                                 compl_event (compl_event) {}
00125 
00130     bool operator ()() const { return compl_event.test(); }
00131   };
00132 
00133   public:
00145   unsigned int current_thread_id ()  {
00146     unsigned int tid;
00147     PFUNC_START_TRY_BLOCK()
00148     tid = (thread_manager.tls_get ())->get_thread_id ();
00149     PFUNC_END_TRY_BLOCK()
00150     PFUNC_CATCH_AND_RETHROW(taskmgr,current_thread_id)
00151     return tid;
00152   }
00153 
00165   task* current_task_information ()  {
00166     task* tptr;
00167     PFUNC_START_TRY_BLOCK()
00168     tptr = &(task_cache[current_thread_id()]);
00169     PFUNC_END_TRY_BLOCK()
00170     PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_information)
00171     return tptr;
00172   }
00173 
00184   unsigned int current_task_group_rank () {
00185     unsigned int grank;
00186     PFUNC_START_TRY_BLOCK()
00187     grank = task_cache[current_thread_id ()].get_rank ();
00188     PFUNC_END_TRY_BLOCK()
00189     PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_rank)
00190     return grank;
00191   }
00192 
00203   unsigned int current_task_group_size () {
00204     unsigned int gsize;
00205     PFUNC_START_TRY_BLOCK()
00206     gsize = task_cache[current_thread_id ()].get_size ();
00207     PFUNC_END_TRY_BLOCK()
00208     PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_size)
00209     return gsize;
00210   }
00211 
00218   void current_task_group_barrier () {
00219     PFUNC_START_TRY_BLOCK()
00220     task_cache[current_thread_id ()].barrier (*this);
00221     PFUNC_END_TRY_BLOCK()
00222     PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_barrier)
00223   }
00224 
00238   void spawn_task (task& new_task, 
00239                    functor& new_work)  {
00240     PFUNC_START_TRY_BLOCK()
00241     spawn_task (new_task, default_attribute, default_group, new_work);
00242     PFUNC_END_TRY_BLOCK()
00243     PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task)
00244   }
00245 
00259   void spawn_task (task& new_task, 
00260                    const attribute& new_attr,
00261                    functor& new_work)  {
00262     PFUNC_START_TRY_BLOCK()
00263     spawn_task (new_task, new_attr, default_group, new_work);
00264     PFUNC_END_TRY_BLOCK()
00265     PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task)
00266   }
00267 
00281   void spawn_task (task& new_task, 
00282                    const attribute& new_attr, 
00283                    group& new_group,
00284                    functor& new_work)  {
00285     PFUNC_START_TRY_BLOCK()
00286     new_task.set_attr (new_attr);
00287     new_task.set_group (&new_group);
00288     new_task.set_func (&new_work);
00289     new_task.reset_completion (new_attr.get_num_waiters());
00290     unsigned int task_queue_number = new_attr.get_queue_number();
00291 
00292     if (QUEUE_CURRENT_THREAD == task_queue_number) /* current thread's queue*/
00293       task_queue_number=(thread_manager.tls_get())->get_task_queue_number();
00294     
00295     task_queue->put (task_queue_number, &new_task);
00296     PFUNC_END_TRY_BLOCK()
00297     PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task)
00298   }
00299 
00313   void spawn_task (void* new_task, 
00314                    void* new_work)  {
00315     PFUNC_START_TRY_BLOCK()
00316     spawn_task (*(static_cast<task*>(new_task)),
00317                 default_attribute,
00318                 default_group,
00319                 *(static_cast<functor*>(new_work)));
00320     PFUNC_END_TRY_BLOCK()
00321     PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task)
00322   }
00323 
00338   void spawn_task (void* new_task, 
00339                    void* new_attr, 
00340                    void* new_work)  {
00341     PFUNC_START_TRY_BLOCK()
00342     spawn_task (*(static_cast<task*>(new_task)),
00343                 *(static_cast<attribute*>(new_attr)),
00344                 default_group,
00345                 *(static_cast<functor*>(new_work)));
00346     PFUNC_END_TRY_BLOCK()
00347     PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task)
00348   }
00349 
00365   void spawn_task (void* new_task, 
00366                    void* new_attr, 
00367                    void* new_group,
00368                    void* new_work)  {
00369     PFUNC_START_TRY_BLOCK()
00370     spawn_task (*(static_cast<task*>(new_task)),
00371                 *(static_cast<attribute*>(new_attr)),
00372                 *(static_cast<group*>(new_group)),
00373                 *(static_cast<functor*>(new_work)));
00374     PFUNC_END_TRY_BLOCK()
00375     PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task)
00376   }
00377 
00387   taskmgr (const unsigned int& num_queues, 
00388            const unsigned int* thds_per_queue,
00389 #if PFUNC_USE_PAPI == 1
00390            const virtual_perf_data& perf_data,
00391 #endif
00392            const unsigned int** affinity = NULL)  :
00393                       default_attribute (true /* is nested */, 
00394                                          false /* grouped */),
00395                       default_group () /* default constructor */,
00396                       num_queues (num_queues),
00397                       num_threads (0),
00398                       threads_per_queue (NULL),
00399                       task_queue(NULL),
00400                       thread_handles (NULL),
00401                       thread_data (NULL),
00402                       thread_args (NULL),
00403                       thread_start_count (0),
00404 #if PFUNC_USE_PAPI == 1
00405                       perf_event_values (NULL),
00406 #endif
00407                       thread_state (NULL),
00408                       task_max_attempts (2000000)
00409                       PFUNC_EXCEPT_PTR_INIT() {
00410     PFUNC_START_TRY_BLOCK()
00411     /* Allocate memory for threads_per_queue */
00412     threads_per_queue = new unsigned int[num_queues];
00413     
00414     /* Copy-in threads_per_queue and calculate the number of threads */
00415     for (unsigned int i=0; i<num_queues; ++i) {
00416       threads_per_queue[i] = thds_per_queue[i];
00417       num_threads += threads_per_queue[i];
00418     }
00419 
00420     /* Set the ID of the main thread to be num_threads */
00421     main_thread_attr = new thread_attr (PFUNC_STACK_MAX,/* main thread stack */
00422                                         num_threads, /* main thread ID number */
00423                                         0,
00424                                         0), /* put main thread jobs on Q 0 */
00425 
00426 #if PFUNC_HAVE_TLS == 1
00427     /* Allocate memory for the thread data */
00428     thread_manager.initialize (num_threads+1/*for the main thread*/);
00429     pfunc_thread_self_id = num_threads; /*for the main thread*/
00430 #endif
00431     
00432     /* Allocate memory for the Queues */
00433     task_queue = new queue_type(num_queues);
00434     
00435     /* Allocate memory to hold the handles */
00436     thread_handles = new thread_handle_type [num_threads];
00437     
00438     /* Allocate memory to hold the thread_data */
00439     thread_data = new thread_attr*[num_threads];
00440     
00441     /* Allocate memory to hold the arguments to the thread function */
00442     thread_args = new reroute_function_arg*[num_threads];
00443 
00444     /* Allocate memory to hold the tasks. This is used for the cache */
00445     task_cache = new task[num_threads];
00446     
00447     /* Allocate memory for the thread_state */
00448     thread_state = new aligned_bool [num_threads];
00449 
00450     /* Add the main thread's attribute to TLS */
00451     thread_manager.tls_set (main_thread_attr);
00452 
00453     /* set the barrier */
00454     start_up_barrier.initialize (num_threads);
00455 
00456     /* If PERF has been defined, then so be it */
00457 #if PFUNC_USE_PAPI == 1
00458     num_events = perf_data.get_num_events ();
00459     event_codes = perf_data.get_events ();
00460     perf_event_values = perf_data.get_event_storage ();
00461 
00462     if (!perf::initialize (event_codes, num_events))
00463       printf ("Could not initialize performance counting\n");
00464 #endif
00465 
00466     /* Now to create threads */
00467     int index = -1;
00468 
00469     for (unsigned int i=0; i<num_queues; ++i) {
00470       for (unsigned int j=0; j<threads_per_queue[i]; ++j) {
00471         ++index;
00472         thread_data[index] = new thread_attr 
00473                                    (PFUNC_STACK_MAX, /* stack_size */
00474                                     index, /* Thread ID */
00475                                     (NULL==affinity)? PFUNC_NO_AFFINITY:
00476                                                       affinity [i][j], 
00477                                     i); /* Queue Number*/
00478      
00479         thread_args[index]=new reroute_function_arg(this, thread_data[index]);
00480         
00481         thread_manager.create_thread (thread_handles[index], /* handle */
00482                                thread_data[index], /* attributes */
00483                                reroute_function, /* trampoline function */
00484                                thread_args[index]); /* arg */
00485       }
00486     }
00487 
00488     /* Make sure that we do not exit this function until all the threads */
00489     while (thread_start_count != static_cast<unsigned int> (num_threads));
00490     PFUNC_END_TRY_BLOCK()
00491     PFUNC_CATCH_AND_RETHROW(taskmgr,taskmgr)
00492   }
00493 
00497   virtual ~taskmgr ()  {
00498     PFUNC_START_TRY_BLOCK()
00499 
00500     
00501     for (unsigned int i=0; i<num_threads; ++i) thread_state[i].cancel ();
00502 
00504     for (unsigned int i=0; i<num_threads; ++i) 
00505       thread_manager.join_thread (thread_handles[i]);
00506 
00507     for (unsigned int i=0; i<num_threads; ++i) {
00508       delete thread_data[i];
00509       delete thread_args[i];
00510     }
00511 
00512     delete task_queue;
00513     delete [] thread_handles;
00514     delete [] thread_data;
00515     delete [] task_cache;
00516     delete [] thread_args;
00517     delete [] threads_per_queue;
00518     delete [] thread_state;
00519     delete main_thread_attr;
00520 
00521     PFUNC_EXCEPT_PTR_CLEAR()
00522     PFUNC_END_TRY_BLOCK()
00523     PFUNC_CATCH_AND_RETHROW(taskmgr,~taskmgr)
00524   }
00525 
00531   void set_max_attempts (const unsigned int& max_attempts) {
00532     task_max_attempts = max_attempts;
00533   }
00534 
00540   unsigned int get_max_attempts () const {
00541     return task_max_attempts;
00542   }
00543 
00559   template <typename CompletionPredicate, typename TaskPredicate>
00560   task* get_task (const CompletionPredicate& completion_pred,
00561                   const unsigned int& max_attempts,
00562                   const unsigned int& queue_number,
00563                   const TaskPredicate& task_pred) {
00564     task* return_value = NULL;
00565 
00566     PFUNC_START_TRY_BLOCK()
00567 
00568     unsigned int num_attempts = max_attempts;
00579     do {
00580       while (!completion_pred() && (0<num_attempts--)) {
00581         if (NULL != 
00582             (return_value = task_queue->get (queue_number, task_pred))) break;
00583       }
00584       if (completion_pred() || NULL!=return_value) break; 
00585       else num_attempts = (0==(max_attempts/2))? 1: (max_attempts/2);
00586       pfunc::detail::thread::yield();
00587     } while (true);
00588 
00589     PFUNC_END_TRY_BLOCK()
00590     PFUNC_CATCH_AND_RETHROW(taskmgr,get_task)
00591 
00592     return return_value;
00593   }
00594 
00608   void operator()(void* _my_attr) {
00609 
00610     PFUNC_START_TRY_BLOCK()
00611 
00612     /* Get information about self */
00613     thread_attr* my_attr = 
00614                 static_cast<thread_attr*> (_my_attr);
00615 
00616     /* Get the information pertaining to this thread */
00617     const unsigned int my_thread_id = my_attr->get_thread_id ();
00618     const unsigned int my_task_queue_number = my_attr->get_task_queue_number ();
00619     const unsigned int my_processor_affinity = my_attr->get_thread_affinity ();
00620 
00621     /* Set my id for thread_attr access from other places */
00622 #if PFUNC_HAVE_TLS == 1
00623     pfunc_thread_self_id = my_thread_id;
00624 #endif
00625 
00626     /* Save the attribute for later */
00627     thread_manager.tls_set (my_attr);
00628 
00629     /* Let us set the processor affinity now */
00630     if (PFUNC_NO_AFFINITY!=my_processor_affinity)
00631       thread_manager.set_affinity (my_processor_affinity);
00632 
00633     /* Wait for all the threads to get here */
00634     start_up_barrier();
00635 
00636     /* If performance has been defined, create events and start counting */
00637 #if PFUNC_USE_PAPI == 1
00638     int my_event_set = perf::create_events ();
00639     if (!perf::start_events (my_event_set)) 
00640       printf ("%u: Could not start counting\n", my_thread_id);
00641 #endif
00642 
00643     /* When everything is set up, signal the main thread */
00644     pfunc_fetch_and_add_32 (&thread_start_count, 1);
00645 
00646     /* WORK LOOP */
00647     task* my_task = NULL;
00648     while (NULL != (my_task = get_task ((thread_state[my_thread_id]),
00649                                         task_max_attempts,
00650                                         my_task_queue_number,
00651                                         regular_predicate(NULL)))) {
00652       task_cache [my_thread_id].shallow_copy(*my_task); /* Set the cache */
00653       my_task->run (); /* Now, lets run the job */
00654       my_task->notify (); /* signal whoever was waiting */
00655     }
00656 
00658 #if PFUNC_USE_PAPI == 1
00659     if (!perf::stop_events (my_event_set, perf_event_values[my_thread_id]))
00660       printf ("%u: Could not stop counting\n", my_thread_id);
00661 #endif
00662 
00663     PFUNC_END_TRY_BLOCK()
00664     PFUNC_CATCH_AND_RETHROW(taskmgr,op_paranthesis)
00665 
00666     // We do not return from here --- so it has to be outside the
00667     // try catch block!
00668     thread_manager.exit_thread ();
00669   }
00670 
00679   void progress_wait (event<testable_event>& compl_event) {
00680 
00681     PFUNC_START_TRY_BLOCK()
00682 
00683     thread_attr& my_data = *(thread_manager.tls_get ());
00684 
00685     const unsigned int my_thread_id = my_data.get_thread_id ();
00686     const unsigned int my_task_queue_number = my_data.get_task_queue_number ();
00687 
00689     task_completion_predicate completion_pred (compl_event);
00690      
00697     if (num_threads == my_thread_id) {
00698       while (!completion_pred()) pfunc::detail::thread::yield ();
00699     } else { 
00700       task current_task;
00701      
00702       current_task.shallow_copy (task_cache[my_thread_id]);
00703      
00704       task* my_task = NULL;
00705       while (NULL != (my_task = get_task (completion_pred,
00706                                           task_max_attempts,
00707                                           my_task_queue_number,
00708                                           waiting_predicate (&current_task)))) {
00709      
00710         /* This task might steal again, set it to be in the cache */
00711         task_cache[my_thread_id].shallow_copy(*my_task);
00712        
00713         /* run the task */
00714         my_task->run (); 
00715        
00716         /* Notify the waiters on the second task */
00717         my_task->notify ();
00718        
00719         /* We have finished stealing, reset the cache again */
00720         task_cache[my_thread_id].shallow_copy(current_task);
00721       }
00722     }
00723 
00724     PFUNC_END_TRY_BLOCK()
00725     PFUNC_CATCH_AND_RETHROW(taskmgr,progress_wait)
00726   }
00727 
00732   void progress_barrier ()  {
00733 
00734     PFUNC_START_TRY_BLOCK()
00735 
00736     thread_attr& my_data = *(thread_manager.tls_get ());
00737     if (num_threads == my_data.get_thread_id ()) return;
00738 
00739     const unsigned int my_thread_id = my_data.get_thread_id ();
00740     const unsigned int my_task_queue_number = my_data.get_task_queue_number ();
00741     
00742     task current_task;
00743 
00744     current_task.shallow_copy (task_cache[my_thread_id]);
00745 
00746     task* my_task = task_queue->get (my_task_queue_number, 
00747                                      group_predicate (&current_task));
00748 
00749     if (NULL == my_task) return;
00750 
00751      /* This task might steal again, set it to be in the cache */
00752     task_cache[my_thread_id].shallow_copy(*my_task);
00753 
00754     /* run the task */
00755     my_task->run (); 
00756 
00757     /* Notify the waiters on the second task */
00758     my_task->notify ();
00759 
00760     /* We have finished stealing, reset the cache again */
00761     task_cache[my_thread_id].shallow_copy(current_task);
00762 
00763     PFUNC_END_TRY_BLOCK()
00764     PFUNC_CATCH_AND_RETHROW(taskmgr,progress_wait)
00765   }
00766 
00770   unsigned int get_num_queues () const { return num_queues; }
00771 
00775   unsigned int get_num_threads () const { return num_threads; }
00776 };
00777 } /* namespace detail */ }  /* namespace pfunc */
00778 #endif // PFUNC_TASKMGR_HPP