PFUNC
1.0
|
00001 #ifndef PFUNC_TASKMGR_HPP 00002 #define PFUNC_TASKMGR_HPP 00003 00004 00010 #include <cstdio> 00011 #include <algorithm> 00012 #include <iterator> 00013 #include <limits> 00014 #include <utility> 00015 #include <map> 00016 00017 #include <pfunc/pfunc_common.h> 00018 #include <pfunc/pfunc_atomics.h> 00019 #include <pfunc/exception.hpp> 00020 #include <pfunc/mutex.hpp> 00021 #include <pfunc/barrier.hpp> 00022 #include <pfunc/thread.hpp> 00023 00024 #if PFUNC_USE_PAPI == 1 00025 #include <pfunc/perf.hpp> 00026 #endif 00027 00028 #include <pfunc/event.hpp> 00029 #include <pfunc/group.hpp> 00030 #include <pfunc/attribute.hpp> 00031 #include <pfunc/task.hpp> 00032 #include <pfunc/trampolines.hpp> 00033 #include <pfunc/task_queue_set.hpp> 00034 #include <pfunc/predicate.hpp> 00035 #include <pfunc/environ.hpp> 00036 00047 namespace pfunc { namespace detail { 00048 00063 template <typename SchedPolicyName, 00064 typename Task> 00065 struct taskmgr : public taskmgr_virtual_base { 00066 typedef Task task; 00067 typedef typename task::functor functor; 00068 typedef SchedPolicyName sched_policy_name; 00069 typedef task_queue_set<sched_policy_name, task> queue_type; 00070 typedef typename task::attribute attribute; 00071 typedef typename attribute::priority_type priority_type; 00072 typedef thread::native_thread_id_type native_thread_id_type; 00074 typedef regular_predicate_pair<sched_policy_name, task> regular_predicate; 00075 typedef waiting_predicate_pair<sched_policy_name, task> waiting_predicate; 00076 typedef group_predicate_pair<sched_policy_name, task> group_predicate; 00077 typedef typename thread::thread_handle_type thread_handle_type; 00078 00079 /* Default values for attribute and group */ 00080 const attribute default_attribute; 00081 group default_group; 00083 private: 00084 const unsigned int num_queues; 00085 unsigned int num_threads; 00086 unsigned int* threads_per_queue; 00087 queue_type* task_queue; 00088 thread_handle_type* thread_handles; 00089 thread_attr** thread_data; 00090 task* task_cache; 00091 reroute_function_arg** thread_args; 00092 volatile unsigned int thread_start_count; 00093 thread_attr* main_thread_attr; 00094 thread thread_manager; 00095 barrier start_up_barrier; 00096 #if PFUNC_USE_PAPI == 1 00097 long long** perf_event_values; 00098 int num_events; 00099 int *event_codes; 00100 #endif 00101 00104 struct aligned_bool { 00105 ALIGN128 volatile bool is_cancelled; 00106 aligned_bool () : is_cancelled (false) {} 00107 void cancel () { is_cancelled = true; } 00108 bool operator()() const { return is_cancelled; } 00109 }; 00110 aligned_bool* thread_state; 00111 unsigned int task_max_attempts; 00112 PFUNC_DEFINE_EXCEPT_PTR() 00117 struct task_completion_predicate { 00118 event<testable_event>& compl_event; 00123 task_completion_predicate (event<testable_event>& compl_event) : 00124 compl_event (compl_event) {} 00125 00130 bool operator ()() const { return compl_event.test(); } 00131 }; 00132 00133 public: 00145 unsigned int current_thread_id () { 00146 unsigned int tid; 00147 PFUNC_START_TRY_BLOCK() 00148 tid = (thread_manager.tls_get ())->get_thread_id (); 00149 PFUNC_END_TRY_BLOCK() 00150 PFUNC_CATCH_AND_RETHROW(taskmgr,current_thread_id) 00151 return tid; 00152 } 00153 00165 task* current_task_information () { 00166 task* tptr; 00167 PFUNC_START_TRY_BLOCK() 00168 tptr = &(task_cache[current_thread_id()]); 00169 PFUNC_END_TRY_BLOCK() 00170 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_information) 00171 return tptr; 00172 } 00173 00184 unsigned int current_task_group_rank () { 00185 unsigned int grank; 00186 PFUNC_START_TRY_BLOCK() 00187 grank = task_cache[current_thread_id ()].get_rank (); 00188 PFUNC_END_TRY_BLOCK() 00189 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_rank) 00190 return grank; 00191 } 00192 00203 unsigned int current_task_group_size () { 00204 unsigned int gsize; 00205 PFUNC_START_TRY_BLOCK() 00206 gsize = task_cache[current_thread_id ()].get_size (); 00207 PFUNC_END_TRY_BLOCK() 00208 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_size) 00209 return gsize; 00210 } 00211 00218 void current_task_group_barrier () { 00219 PFUNC_START_TRY_BLOCK() 00220 task_cache[current_thread_id ()].barrier (*this); 00221 PFUNC_END_TRY_BLOCK() 00222 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_barrier) 00223 } 00224 00238 void spawn_task (task& new_task, 00239 functor& new_work) { 00240 PFUNC_START_TRY_BLOCK() 00241 spawn_task (new_task, default_attribute, default_group, new_work); 00242 PFUNC_END_TRY_BLOCK() 00243 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00244 } 00245 00259 void spawn_task (task& new_task, 00260 const attribute& new_attr, 00261 functor& new_work) { 00262 PFUNC_START_TRY_BLOCK() 00263 spawn_task (new_task, new_attr, default_group, new_work); 00264 PFUNC_END_TRY_BLOCK() 00265 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00266 } 00267 00281 void spawn_task (task& new_task, 00282 const attribute& new_attr, 00283 group& new_group, 00284 functor& new_work) { 00285 PFUNC_START_TRY_BLOCK() 00286 new_task.set_attr (new_attr); 00287 new_task.set_group (&new_group); 00288 new_task.set_func (&new_work); 00289 new_task.reset_completion (new_attr.get_num_waiters()); 00290 unsigned int task_queue_number = new_attr.get_queue_number(); 00291 00292 if (QUEUE_CURRENT_THREAD == task_queue_number) /* current thread's queue*/ 00293 task_queue_number=(thread_manager.tls_get())->get_task_queue_number(); 00294 00295 task_queue->put (task_queue_number, &new_task); 00296 PFUNC_END_TRY_BLOCK() 00297 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00298 } 00299 00313 void spawn_task (void* new_task, 00314 void* new_work) { 00315 PFUNC_START_TRY_BLOCK() 00316 spawn_task (*(static_cast<task*>(new_task)), 00317 default_attribute, 00318 default_group, 00319 *(static_cast<functor*>(new_work))); 00320 PFUNC_END_TRY_BLOCK() 00321 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00322 } 00323 00338 void spawn_task (void* new_task, 00339 void* new_attr, 00340 void* new_work) { 00341 PFUNC_START_TRY_BLOCK() 00342 spawn_task (*(static_cast<task*>(new_task)), 00343 *(static_cast<attribute*>(new_attr)), 00344 default_group, 00345 *(static_cast<functor*>(new_work))); 00346 PFUNC_END_TRY_BLOCK() 00347 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00348 } 00349 00365 void spawn_task (void* new_task, 00366 void* new_attr, 00367 void* new_group, 00368 void* new_work) { 00369 PFUNC_START_TRY_BLOCK() 00370 spawn_task (*(static_cast<task*>(new_task)), 00371 *(static_cast<attribute*>(new_attr)), 00372 *(static_cast<group*>(new_group)), 00373 *(static_cast<functor*>(new_work))); 00374 PFUNC_END_TRY_BLOCK() 00375 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00376 } 00377 00387 taskmgr (const unsigned int& num_queues, 00388 const unsigned int* thds_per_queue, 00389 #if PFUNC_USE_PAPI == 1 00390 const virtual_perf_data& perf_data, 00391 #endif 00392 const unsigned int** affinity = NULL) : 00393 default_attribute (true /* is nested */, 00394 false /* grouped */), 00395 default_group () /* default constructor */, 00396 num_queues (num_queues), 00397 num_threads (0), 00398 threads_per_queue (NULL), 00399 task_queue(NULL), 00400 thread_handles (NULL), 00401 thread_data (NULL), 00402 thread_args (NULL), 00403 thread_start_count (0), 00404 #if PFUNC_USE_PAPI == 1 00405 perf_event_values (NULL), 00406 #endif 00407 thread_state (NULL), 00408 task_max_attempts (2000000) 00409 PFUNC_EXCEPT_PTR_INIT() { 00410 PFUNC_START_TRY_BLOCK() 00411 /* Allocate memory for threads_per_queue */ 00412 threads_per_queue = new unsigned int[num_queues]; 00413 00414 /* Copy-in threads_per_queue and calculate the number of threads */ 00415 for (unsigned int i=0; i<num_queues; ++i) { 00416 threads_per_queue[i] = thds_per_queue[i]; 00417 num_threads += threads_per_queue[i]; 00418 } 00419 00420 /* Set the ID of the main thread to be num_threads */ 00421 main_thread_attr = new thread_attr (PFUNC_STACK_MAX,/* main thread stack */ 00422 num_threads, /* main thread ID number */ 00423 0, 00424 0), /* put main thread jobs on Q 0 */ 00425 00426 #if PFUNC_HAVE_TLS == 1 00427 /* Allocate memory for the thread data */ 00428 thread_manager.initialize (num_threads+1/*for the main thread*/); 00429 pfunc_thread_self_id = num_threads; /*for the main thread*/ 00430 #endif 00431 00432 /* Allocate memory for the Queues */ 00433 task_queue = new queue_type(num_queues); 00434 00435 /* Allocate memory to hold the handles */ 00436 thread_handles = new thread_handle_type [num_threads]; 00437 00438 /* Allocate memory to hold the thread_data */ 00439 thread_data = new thread_attr*[num_threads]; 00440 00441 /* Allocate memory to hold the arguments to the thread function */ 00442 thread_args = new reroute_function_arg*[num_threads]; 00443 00444 /* Allocate memory to hold the tasks. This is used for the cache */ 00445 task_cache = new task[num_threads]; 00446 00447 /* Allocate memory for the thread_state */ 00448 thread_state = new aligned_bool [num_threads]; 00449 00450 /* Add the main thread's attribute to TLS */ 00451 thread_manager.tls_set (main_thread_attr); 00452 00453 /* set the barrier */ 00454 start_up_barrier.initialize (num_threads); 00455 00456 /* If PERF has been defined, then so be it */ 00457 #if PFUNC_USE_PAPI == 1 00458 num_events = perf_data.get_num_events (); 00459 event_codes = perf_data.get_events (); 00460 perf_event_values = perf_data.get_event_storage (); 00461 00462 if (!perf::initialize (event_codes, num_events)) 00463 printf ("Could not initialize performance counting\n"); 00464 #endif 00465 00466 /* Now to create threads */ 00467 int index = -1; 00468 00469 for (unsigned int i=0; i<num_queues; ++i) { 00470 for (unsigned int j=0; j<threads_per_queue[i]; ++j) { 00471 ++index; 00472 thread_data[index] = new thread_attr 00473 (PFUNC_STACK_MAX, /* stack_size */ 00474 index, /* Thread ID */ 00475 (NULL==affinity)? PFUNC_NO_AFFINITY: 00476 affinity [i][j], 00477 i); /* Queue Number*/ 00478 00479 thread_args[index]=new reroute_function_arg(this, thread_data[index]); 00480 00481 thread_manager.create_thread (thread_handles[index], /* handle */ 00482 thread_data[index], /* attributes */ 00483 reroute_function, /* trampoline function */ 00484 thread_args[index]); /* arg */ 00485 } 00486 } 00487 00488 /* Make sure that we do not exit this function until all the threads */ 00489 while (thread_start_count != static_cast<unsigned int> (num_threads)); 00490 PFUNC_END_TRY_BLOCK() 00491 PFUNC_CATCH_AND_RETHROW(taskmgr,taskmgr) 00492 } 00493 00497 virtual ~taskmgr () { 00498 PFUNC_START_TRY_BLOCK() 00499 00500 00501 for (unsigned int i=0; i<num_threads; ++i) thread_state[i].cancel (); 00502 00504 for (unsigned int i=0; i<num_threads; ++i) 00505 thread_manager.join_thread (thread_handles[i]); 00506 00507 for (unsigned int i=0; i<num_threads; ++i) { 00508 delete thread_data[i]; 00509 delete thread_args[i]; 00510 } 00511 00512 delete task_queue; 00513 delete [] thread_handles; 00514 delete [] thread_data; 00515 delete [] task_cache; 00516 delete [] thread_args; 00517 delete [] threads_per_queue; 00518 delete [] thread_state; 00519 delete main_thread_attr; 00520 00521 PFUNC_EXCEPT_PTR_CLEAR() 00522 PFUNC_END_TRY_BLOCK() 00523 PFUNC_CATCH_AND_RETHROW(taskmgr,~taskmgr) 00524 } 00525 00531 void set_max_attempts (const unsigned int& max_attempts) { 00532 task_max_attempts = max_attempts; 00533 } 00534 00540 unsigned int get_max_attempts () const { 00541 return task_max_attempts; 00542 } 00543 00559 template <typename CompletionPredicate, typename TaskPredicate> 00560 task* get_task (const CompletionPredicate& completion_pred, 00561 const unsigned int& max_attempts, 00562 const unsigned int& queue_number, 00563 const TaskPredicate& task_pred) { 00564 task* return_value = NULL; 00565 00566 PFUNC_START_TRY_BLOCK() 00567 00568 unsigned int num_attempts = max_attempts; 00579 do { 00580 while (!completion_pred() && (0<num_attempts--)) { 00581 if (NULL != 00582 (return_value = task_queue->get (queue_number, task_pred))) break; 00583 } 00584 if (completion_pred() || NULL!=return_value) break; 00585 else num_attempts = (0==(max_attempts/2))? 1: (max_attempts/2); 00586 pfunc::detail::thread::yield(); 00587 } while (true); 00588 00589 PFUNC_END_TRY_BLOCK() 00590 PFUNC_CATCH_AND_RETHROW(taskmgr,get_task) 00591 00592 return return_value; 00593 } 00594 00608 void operator()(void* _my_attr) { 00609 00610 PFUNC_START_TRY_BLOCK() 00611 00612 /* Get information about self */ 00613 thread_attr* my_attr = 00614 static_cast<thread_attr*> (_my_attr); 00615 00616 /* Get the information pertaining to this thread */ 00617 const unsigned int my_thread_id = my_attr->get_thread_id (); 00618 const unsigned int my_task_queue_number = my_attr->get_task_queue_number (); 00619 const unsigned int my_processor_affinity = my_attr->get_thread_affinity (); 00620 00621 /* Set my id for thread_attr access from other places */ 00622 #if PFUNC_HAVE_TLS == 1 00623 pfunc_thread_self_id = my_thread_id; 00624 #endif 00625 00626 /* Save the attribute for later */ 00627 thread_manager.tls_set (my_attr); 00628 00629 /* Let us set the processor affinity now */ 00630 if (PFUNC_NO_AFFINITY!=my_processor_affinity) 00631 thread_manager.set_affinity (my_processor_affinity); 00632 00633 /* Wait for all the threads to get here */ 00634 start_up_barrier(); 00635 00636 /* If performance has been defined, create events and start counting */ 00637 #if PFUNC_USE_PAPI == 1 00638 int my_event_set = perf::create_events (); 00639 if (!perf::start_events (my_event_set)) 00640 printf ("%u: Could not start counting\n", my_thread_id); 00641 #endif 00642 00643 /* When everything is set up, signal the main thread */ 00644 pfunc_fetch_and_add_32 (&thread_start_count, 1); 00645 00646 /* WORK LOOP */ 00647 task* my_task = NULL; 00648 while (NULL != (my_task = get_task ((thread_state[my_thread_id]), 00649 task_max_attempts, 00650 my_task_queue_number, 00651 regular_predicate(NULL)))) { 00652 task_cache [my_thread_id].shallow_copy(*my_task); /* Set the cache */ 00653 my_task->run (); /* Now, lets run the job */ 00654 my_task->notify (); /* signal whoever was waiting */ 00655 } 00656 00658 #if PFUNC_USE_PAPI == 1 00659 if (!perf::stop_events (my_event_set, perf_event_values[my_thread_id])) 00660 printf ("%u: Could not stop counting\n", my_thread_id); 00661 #endif 00662 00663 PFUNC_END_TRY_BLOCK() 00664 PFUNC_CATCH_AND_RETHROW(taskmgr,op_paranthesis) 00665 00666 // We do not return from here --- so it has to be outside the 00667 // try catch block! 00668 thread_manager.exit_thread (); 00669 } 00670 00679 void progress_wait (event<testable_event>& compl_event) { 00680 00681 PFUNC_START_TRY_BLOCK() 00682 00683 thread_attr& my_data = *(thread_manager.tls_get ()); 00684 00685 const unsigned int my_thread_id = my_data.get_thread_id (); 00686 const unsigned int my_task_queue_number = my_data.get_task_queue_number (); 00687 00689 task_completion_predicate completion_pred (compl_event); 00690 00697 if (num_threads == my_thread_id) { 00698 while (!completion_pred()) pfunc::detail::thread::yield (); 00699 } else { 00700 task current_task; 00701 00702 current_task.shallow_copy (task_cache[my_thread_id]); 00703 00704 task* my_task = NULL; 00705 while (NULL != (my_task = get_task (completion_pred, 00706 task_max_attempts, 00707 my_task_queue_number, 00708 waiting_predicate (¤t_task)))) { 00709 00710 /* This task might steal again, set it to be in the cache */ 00711 task_cache[my_thread_id].shallow_copy(*my_task); 00712 00713 /* run the task */ 00714 my_task->run (); 00715 00716 /* Notify the waiters on the second task */ 00717 my_task->notify (); 00718 00719 /* We have finished stealing, reset the cache again */ 00720 task_cache[my_thread_id].shallow_copy(current_task); 00721 } 00722 } 00723 00724 PFUNC_END_TRY_BLOCK() 00725 PFUNC_CATCH_AND_RETHROW(taskmgr,progress_wait) 00726 } 00727 00732 void progress_barrier () { 00733 00734 PFUNC_START_TRY_BLOCK() 00735 00736 thread_attr& my_data = *(thread_manager.tls_get ()); 00737 if (num_threads == my_data.get_thread_id ()) return; 00738 00739 const unsigned int my_thread_id = my_data.get_thread_id (); 00740 const unsigned int my_task_queue_number = my_data.get_task_queue_number (); 00741 00742 task current_task; 00743 00744 current_task.shallow_copy (task_cache[my_thread_id]); 00745 00746 task* my_task = task_queue->get (my_task_queue_number, 00747 group_predicate (¤t_task)); 00748 00749 if (NULL == my_task) return; 00750 00751 /* This task might steal again, set it to be in the cache */ 00752 task_cache[my_thread_id].shallow_copy(*my_task); 00753 00754 /* run the task */ 00755 my_task->run (); 00756 00757 /* Notify the waiters on the second task */ 00758 my_task->notify (); 00759 00760 /* We have finished stealing, reset the cache again */ 00761 task_cache[my_thread_id].shallow_copy(current_task); 00762 00763 PFUNC_END_TRY_BLOCK() 00764 PFUNC_CATCH_AND_RETHROW(taskmgr,progress_wait) 00765 } 00766 00770 unsigned int get_num_queues () const { return num_queues; } 00771 00775 unsigned int get_num_threads () const { return num_threads; } 00776 }; 00777 } /* namespace detail */ } /* namespace pfunc */ 00778 #endif // PFUNC_TASKMGR_HPP