PFUNC 1.0
|
00001 #ifndef PFUNC_TASKMGR_HPP 00002 #define PFUNC_TASKMGR_HPP 00003 00004 00010 #include <cstdio> 00011 #include <algorithm> 00012 #include <iterator> 00013 #include <limits> 00014 #include <utility> 00015 #include <map> 00016 00017 #include <pfunc/pfunc_common.h> 00018 #include <pfunc/pfunc_atomics.h> 00019 #include <pfunc/exception.hpp> 00020 #include <pfunc/mutex.hpp> 00021 #include <pfunc/barrier.hpp> 00022 #include <pfunc/thread.hpp> 00023 00024 #if PFUNC_USE_PAPI == 1 00025 #include <pfunc/perf.hpp> 00026 #endif 00027 00028 #include <pfunc/event.hpp> 00029 #include <pfunc/group.hpp> 00030 #include <pfunc/attribute.hpp> 00031 #include <pfunc/task.hpp> 00032 #include <pfunc/trampolines.hpp> 00033 #include <pfunc/task_queue_set.hpp> 00034 #include <pfunc/predicate.hpp> 00035 #include <pfunc/environ.hpp> 00036 00047 namespace pfunc { namespace detail { 00048 00063 template <typename SchedPolicyName, 00064 typename Task> 00065 struct taskmgr : public taskmgr_virtual_base { 00066 typedef Task task; 00067 typedef typename task::functor functor; 00068 typedef SchedPolicyName sched_policy_name; 00069 typedef task_queue_set<sched_policy_name, task> queue_type; 00070 typedef typename task::attribute attribute; 00071 typedef typename attribute::priority_type priority_type; 00072 typedef thread::native_thread_id_type native_thread_id_type; 00074 typedef regular_predicate_pair<sched_policy_name, task> regular_predicate; 00075 typedef waiting_predicate_pair<sched_policy_name, task> waiting_predicate; 00076 typedef group_predicate_pair<sched_policy_name, task> group_predicate; 00077 typedef typename thread::thread_handle_type thread_handle_type; 00078 00079 /* Default values for attribute and group */ 00080 const attribute default_attribute; 00081 group default_group; 00083 private: 00084 const unsigned int num_queues; 00085 unsigned int num_threads; 00086 unsigned int* threads_per_queue; 00087 queue_type* task_queue; 00088 thread_handle_type* thread_handles; 00089 thread_attr** thread_data; 00090 task* task_cache; 00091 reroute_function_arg** thread_args; 00092 volatile unsigned int thread_start_count; 00093 thread_attr* main_thread_attr; 00094 thread thread_manager; 00095 barrier start_up_barrier; 00096 #if PFUNC_USE_PAPI == 1 00097 long long** perf_event_values; 00098 int num_events; 00099 int *event_codes; 00100 #endif 00101 00104 struct aligned_bool { 00105 ALIGN128 volatile bool is_cancelled; 00106 aligned_bool () : is_cancelled (false) {} 00107 void cancel () { is_cancelled = true; } 00108 bool operator()() const { return is_cancelled; } 00109 }; 00110 aligned_bool* thread_state; 00111 unsigned int task_max_attempts; 00112 PFUNC_DEFINE_EXCEPT_PTR() 00117 struct task_completion_predicate { 00118 event<testable_event>& compl_event; 00123 task_completion_predicate (event<testable_event>& compl_event) : 00124 compl_event (compl_event) {} 00125 00130 bool operator ()() const { return compl_event.test(); } 00131 }; 00132 00133 public: 00145 unsigned int current_thread_id () { 00146 unsigned int tid; 00147 PFUNC_START_TRY_BLOCK() 00148 tid = (thread_manager.tls_get ())->get_thread_id (); 00149 PFUNC_END_TRY_BLOCK() 00150 PFUNC_CATCH_AND_RETHROW(taskmgr,current_thread_id) 00151 return tid; 00152 } 00153 00165 task* current_task_information () { 00166 task* tptr; 00167 PFUNC_START_TRY_BLOCK() 00168 tptr = &(task_cache[current_thread_id()]); 00169 PFUNC_END_TRY_BLOCK() 00170 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_information) 00171 return tptr; 00172 } 00173 00184 unsigned int current_task_group_rank () { 00185 unsigned int grank; 00186 PFUNC_START_TRY_BLOCK() 00187 grank = task_cache[current_thread_id ()].get_rank (); 00188 PFUNC_END_TRY_BLOCK() 00189 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_rank) 00190 return grank; 00191 } 00192 00203 unsigned int current_task_group_size () { 00204 unsigned int gsize; 00205 PFUNC_START_TRY_BLOCK() 00206 gsize = task_cache[current_thread_id ()].get_size (); 00207 PFUNC_END_TRY_BLOCK() 00208 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_size) 00209 return gsize; 00210 } 00211 00218 void current_task_group_barrier () { 00219 PFUNC_START_TRY_BLOCK() 00220 task_cache[current_thread_id ()].barrier (*this); 00221 PFUNC_END_TRY_BLOCK() 00222 PFUNC_CATCH_AND_RETHROW(taskmgr,current_task_group_barrier) 00223 } 00224 00238 void spawn_task (task& new_task, 00239 functor& new_work) { 00240 PFUNC_START_TRY_BLOCK() 00241 spawn_task (new_task, default_attribute, default_group, new_work); 00242 PFUNC_END_TRY_BLOCK() 00243 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00244 } 00245 00259 void spawn_task (task& new_task, 00260 const attribute& new_attr, 00261 functor& new_work) { 00262 PFUNC_START_TRY_BLOCK() 00263 spawn_task (new_task, new_attr, default_group, new_work); 00264 PFUNC_END_TRY_BLOCK() 00265 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00266 } 00267 00281 void spawn_task (task& new_task, 00282 const attribute& new_attr, 00283 group& new_group, 00284 functor& new_work) { 00285 PFUNC_START_TRY_BLOCK() 00286 new_task.set_attr (new_attr); 00287 new_task.set_group (&new_group); 00288 new_task.set_func (&new_work); 00289 new_task.reset_completion (new_attr.get_num_waiters()); 00290 unsigned int task_queue_number = new_attr.get_queue_number(); 00291 00292 if (QUEUE_CURRENT_THREAD == task_queue_number) /* current thread's queue*/ 00293 task_queue_number=(thread_manager.tls_get())->get_task_queue_number(); 00294 00295 task_queue->put (task_queue_number, &new_task); 00296 PFUNC_END_TRY_BLOCK() 00297 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00298 } 00299 00313 void spawn_task (void* new_task, 00314 void* new_work) { 00315 PFUNC_START_TRY_BLOCK() 00316 spawn_task (*(static_cast<task*>(new_task)), 00317 default_attribute, 00318 default_group, 00319 *(static_cast<functor*>(new_work))); 00320 PFUNC_END_TRY_BLOCK() 00321 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00322 } 00323 00338 void spawn_task (void* new_task, 00339 void* new_attr, 00340 void* new_work) { 00341 PFUNC_START_TRY_BLOCK() 00342 spawn_task (*(static_cast<task*>(new_task)), 00343 *(static_cast<attribute*>(new_attr)), 00344 default_group, 00345 *(static_cast<functor*>(new_work))); 00346 PFUNC_END_TRY_BLOCK() 00347 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00348 } 00349 00365 void spawn_task (void* new_task, 00366 void* new_attr, 00367 void* new_group, 00368 void* new_work) { 00369 PFUNC_START_TRY_BLOCK() 00370 spawn_task (*(static_cast<task*>(new_task)), 00371 *(static_cast<attribute*>(new_attr)), 00372 *(static_cast<group*>(new_group)), 00373 *(static_cast<functor*>(new_work))); 00374 PFUNC_END_TRY_BLOCK() 00375 PFUNC_CATCH_AND_RETHROW(taskmgr,spawn_task) 00376 } 00377 00387 taskmgr (const unsigned int& num_queues, 00388 const unsigned int* thds_per_queue, 00389 #if PFUNC_USE_PAPI == 1 00390 const virtual_perf_data& perf_data, 00391 #endif 00392 const unsigned int** affinity = NULL) : 00393 default_attribute (true /* is nested */, 00394 false /* grouped */), 00395 default_group () /* default constructor */, 00396 num_queues (num_queues), 00397 num_threads (0), 00398 threads_per_queue (NULL), 00399 task_queue(NULL), 00400 thread_handles (NULL), 00401 thread_data (NULL), 00402 thread_args (NULL), 00403 thread_start_count (0), 00404 #if PFUNC_USE_PAPI == 1 00405 perf_event_values (NULL), 00406 #endif 00407 thread_state (NULL), 00408 task_max_attempts (2000000) 00409 PFUNC_EXCEPT_PTR_INIT() { 00410 PFUNC_START_TRY_BLOCK() 00411 /* Allocate memory for threads_per_queue */ 00412 threads_per_queue = new unsigned int[num_queues]; 00413 00414 /* Copy-in threads_per_queue and calculate the number of threads */ 00415 for (unsigned int i=0; i<num_queues; ++i) { 00416 threads_per_queue[i] = thds_per_queue[i]; 00417 num_threads += threads_per_queue[i]; 00418 } 00419 00420 /* Set the ID of the main thread to be num_threads */ 00421 main_thread_attr = new thread_attr (PFUNC_STACK_MAX,/* main thread stack */ 00422 num_threads, /* main thread ID number */ 00423 0, 00424 0), /* put main thread jobs on Q 0 */ 00425 00426 #if PFUNC_HAVE_TLS == 1 00427 /* Allocate memory for the thread data */ 00428 thread_manager.initialize (num_threads+1/*for the main thread*/); 00429 pfunc_thread_self_id = num_threads; /*for the main thread*/ 00430 #endif 00431 00432 /* Allocate memory for the Queues */ 00433 task_queue = new queue_type(num_queues); 00434 00435 /* Allocate memory to hold the handles */ 00436 thread_handles = new thread_handle_type [num_threads]; 00437 00438 /* Allocate memory to hold the thread_data */ 00439 thread_data = new thread_attr*[num_threads]; 00440 00441 /* Allocate memory to hold the arguments to the thread function */ 00442 thread_args = new reroute_function_arg*[num_threads]; 00443 00444 /* Allocate memory to hold the tasks. This is used for the cache */ 00445 task_cache = new task[num_threads]; 00446 00447 /* Allocate memory for the thread_state */ 00448 thread_state = new aligned_bool [num_threads]; 00449 00450 /* Add the main thread's attribute to TLS */ 00451 thread_manager.tls_set (main_thread_attr); 00452 00453 /* set the barrier */ 00454 start_up_barrier.initialize (num_threads); 00455 00456 /* If PERF has been defined, then so be it */ 00457 #if PFUNC_USE_PAPI == 1 00458 num_events = perf_data.get_num_events (); 00459 event_codes = perf_data.get_events (); 00460 perf_event_values = perf_data.get_event_storage (); 00461 00462 if (!perf::initialize (event_codes, num_events)) 00463 printf ("Could not initialize performance counting\n"); 00464 #endif 00465 00466 /* Now to create threads */ 00467 int index = -1; 00468 00469 for (unsigned int i=0; i<num_queues; ++i) { 00470 for (unsigned int j=0; j<threads_per_queue[i]; ++j) { 00471 ++index; 00472 thread_data[index] = new thread_attr 00473 (PFUNC_STACK_MAX, /* stack_size */ 00474 index, /* Thread ID */ 00475 (NULL==affinity)? PFUNC_NO_AFFINITY: 00476 affinity [i][j], 00477 i); /* Queue Number*/ 00478 00479 thread_args[index]=new reroute_function_arg(this, thread_data[index]); 00480 00481 thread_manager.create_thread (thread_handles[index], /* handle */ 00482 thread_data[index], /* attributes */ 00483 reroute_function, /* trampoline function */ 00484 thread_args[index]); /* arg */ 00485 } 00486 } 00487 00488 /* Make sure that we do not exit this function until all the threads */ 00489 while (thread_start_count != static_cast<unsigned int> (num_threads)); 00490 PFUNC_END_TRY_BLOCK() 00491 PFUNC_CATCH_AND_RETHROW(taskmgr,taskmgr) 00492 } 00493 00497 virtual ~taskmgr () { 00498 PFUNC_START_TRY_BLOCK() 00499 00500 00501 for (unsigned int i=0; i<num_threads; ++i) thread_state[i].cancel (); 00502 00504 for (unsigned int i=0; i<num_threads; ++i) 00505 thread_manager.join_thread (thread_handles[i]); 00506 00507 for (unsigned int i=0; i<num_threads; ++i) { 00508 delete thread_data[i]; 00509 delete thread_args[i]; 00510 } 00511 00512 delete task_queue; 00513 delete [] thread_handles; 00514 delete [] thread_data; 00515 delete [] task_cache; 00516 delete [] thread_args; 00517 delete [] threads_per_queue; 00518 delete [] thread_state; 00519 delete main_thread_attr; 00520 00521 PFUNC_EXCEPT_PTR_CLEAR() 00522 PFUNC_END_TRY_BLOCK() 00523 PFUNC_CATCH_AND_RETHROW(taskmgr,~taskmgr) 00524 } 00525 00531 void set_max_attempts (const unsigned int& max_attempts) { 00532 task_max_attempts = max_attempts; 00533 } 00534 00540 unsigned int get_max_attempts () const { 00541 return task_max_attempts; 00542 } 00543 00559 template <typename CompletionPredicate, typename TaskPredicate> 00560 task* get_task (const CompletionPredicate& completion_pred, 00561 const unsigned int& max_attempts, 00562 const unsigned int& queue_number, 00563 const TaskPredicate& task_pred) { 00564 unsigned int num_attempts = max_attempts; 00565 task* return_value = NULL; 00566 00567 PFUNC_START_TRY_BLOCK() 00568 00569 while (!completion_pred() && (0 < num_attempts--)) { 00570 if (NULL != (return_value = 00571 task_queue->get (queue_number, task_pred))) 00572 goto end_of_get_task; 00573 } 00574 00576 if (!completion_pred()) { 00577 thread_manager.yield (); 00578 return_value = get_task (completion_pred, 00579 (0==(max_attempts/2)) ? 1 : (max_attempts/2), 00580 queue_number, 00581 task_pred); 00582 } 00583 00584 PFUNC_END_TRY_BLOCK() 00585 PFUNC_CATCH_AND_RETHROW(taskmgr,get_task) 00586 00587 end_of_get_task: 00588 return return_value; 00589 } 00590 00604 void operator()(void* _my_attr) { 00605 00606 PFUNC_START_TRY_BLOCK() 00607 00608 /* Get information about self */ 00609 thread_attr* my_attr = 00610 static_cast<thread_attr*> (_my_attr); 00611 00612 /* Get the information pertaining to this thread */ 00613 const unsigned int my_thread_id = my_attr->get_thread_id (); 00614 const unsigned int my_task_queue_number = my_attr->get_task_queue_number (); 00615 const unsigned int my_processor_affinity = my_attr->get_thread_affinity (); 00616 00617 /* Set my id for thread_attr access from other places */ 00618 #if PFUNC_HAVE_TLS == 1 00619 pfunc_thread_self_id = my_thread_id; 00620 #endif 00621 00622 /* Save the attribute for later */ 00623 thread_manager.tls_set (my_attr); 00624 00625 /* Let us set the processor affinity now */ 00626 if (PFUNC_NO_AFFINITY!=my_processor_affinity) 00627 thread_manager.set_affinity (my_processor_affinity); 00628 00629 /* Wait for all the threads to get here */ 00630 start_up_barrier(); 00631 00632 /* If performance has been defined, create events and start counting */ 00633 #if PFUNC_USE_PAPI == 1 00634 int my_event_set = perf::create_events (); 00635 if (!perf::start_events (my_event_set)) 00636 printf ("%u: Could not start counting\n", my_thread_id); 00637 #endif 00638 00639 /* When everything is set up, signal the main thread */ 00640 pfunc_fetch_and_add_32 (&thread_start_count, 1); 00641 00642 /* WORK LOOP */ 00643 task* my_task = NULL; 00644 while (NULL != (my_task = get_task ((thread_state[my_thread_id]), 00645 task_max_attempts, 00646 my_task_queue_number, 00647 regular_predicate(NULL)))) { 00648 task_cache [my_thread_id].shallow_copy(*my_task); /* Set the cache */ 00649 my_task->run (); /* Now, lets run the job */ 00650 my_task->notify (); /* signal whoever was waiting */ 00651 } 00652 00654 #if PFUNC_USE_PAPI == 1 00655 if (!perf::stop_events (my_event_set, perf_event_values[my_thread_id])) 00656 printf ("%u: Could not stop counting\n", my_thread_id); 00657 #endif 00658 00659 PFUNC_END_TRY_BLOCK() 00660 PFUNC_CATCH_AND_RETHROW(taskmgr,op_paranthesis) 00661 00662 // We do not return from here --- so it has to be outside the 00663 // try catch block! 00664 thread_manager.exit_thread (); 00665 } 00666 00675 void progress_wait (event<testable_event>& compl_event) { 00676 00677 PFUNC_START_TRY_BLOCK() 00678 00679 thread_attr& my_data = *(thread_manager.tls_get ()); 00680 00681 const unsigned int my_thread_id = my_data.get_thread_id (); 00682 const unsigned int my_task_queue_number = my_data.get_task_queue_number (); 00683 00685 task_completion_predicate completion_pred (compl_event); 00686 00693 if (num_threads == my_thread_id) { 00694 while (!completion_pred()) thread_manager.yield (); 00695 } else { 00696 task current_task; 00697 00698 current_task.shallow_copy (task_cache[my_thread_id]); 00699 00700 task* my_task = NULL; 00701 while (NULL != (my_task = get_task (completion_pred, 00702 task_max_attempts, 00703 my_task_queue_number, 00704 waiting_predicate (¤t_task)))) { 00705 00706 /* This task might steal again, set it to be in the cache */ 00707 task_cache[my_thread_id].shallow_copy(*my_task); 00708 00709 /* run the task */ 00710 my_task->run (); 00711 00712 /* Notify the waiters on the second task */ 00713 my_task->notify (); 00714 00715 /* We have finished stealing, reset the cache again */ 00716 task_cache[my_thread_id].shallow_copy(current_task); 00717 } 00718 } 00719 00720 PFUNC_END_TRY_BLOCK() 00721 PFUNC_CATCH_AND_RETHROW(taskmgr,progress_wait) 00722 } 00723 00728 void progress_barrier () { 00729 00730 PFUNC_START_TRY_BLOCK() 00731 00732 thread_attr& my_data = *(thread_manager.tls_get ()); 00733 if (num_threads == my_data.get_thread_id ()) return; 00734 00735 const unsigned int my_thread_id = my_data.get_thread_id (); 00736 const unsigned int my_task_queue_number = my_data.get_task_queue_number (); 00737 00738 task current_task; 00739 00740 current_task.shallow_copy (task_cache[my_thread_id]); 00741 00742 task* my_task = task_queue->get (my_task_queue_number, 00743 group_predicate (¤t_task)); 00744 00745 if (NULL == my_task) return; 00746 00747 /* This task might steal again, set it to be in the cache */ 00748 task_cache[my_thread_id].shallow_copy(*my_task); 00749 00750 /* run the task */ 00751 my_task->run (); 00752 00753 /* Notify the waiters on the second task */ 00754 my_task->notify (); 00755 00756 /* We have finished stealing, reset the cache again */ 00757 task_cache[my_thread_id].shallow_copy(current_task); 00758 00759 PFUNC_END_TRY_BLOCK() 00760 PFUNC_CATCH_AND_RETHROW(taskmgr,progress_wait) 00761 } 00762 00766 unsigned int get_num_queues () const { return num_queues; } 00767 00771 unsigned int get_num_threads () const { return num_threads; } 00772 }; 00773 } /* namespace detail */ } /* namespace pfunc */ 00774 #endif // PFUNC_TASKMGR_HPP