PFUNC
1.0
|
Main class that implements the tasking aspect. More...
#include <pfunc/taskmgr.hpp>
Classes | |
struct | aligned_bool |
Public Types | |
typedef Task | task |
typedef task::functor | functor |
typedef SchedPolicyName | sched_policy_name |
typedef task_queue_set < sched_policy_name, task > | queue_type |
typedef task::attribute | attribute |
typedef attribute::priority_type | priority_type |
typedef thread::native_thread_id_type | native_thread_id_type |
typedef regular_predicate_pair < sched_policy_name, task > | regular_predicate |
typedef waiting_predicate_pair < sched_policy_name, task > | waiting_predicate |
typedef group_predicate_pair < sched_policy_name, task > | group_predicate |
typedef thread::thread_handle_type | thread_handle_type |
Public Member Functions | |
unsigned int | current_thread_id () |
Returns information regarding the current thread. | |
task * | current_task_information () |
Returns information regarding the current task being executed. | |
unsigned int | current_task_group_rank () |
Returns the rank of the calling task in its group. | |
unsigned int | current_task_group_size () |
Returns the size of the calling task's group. | |
void | current_task_group_barrier () |
Executes a barrier accross the group of the currently executing task (and hence, thread). Most of the details regarding the barrier are stored with the task -- so we just refer back to the current task that we cache. | |
void | spawn_task (task &new_task, functor &new_work) |
void | spawn_task (task &new_task, const attribute &new_attr, functor &new_work) |
void | spawn_task (task &new_task, const attribute &new_attr, group &new_group, functor &new_work) |
void | spawn_task (void *new_task, void *new_work) |
void | spawn_task (void *new_task, void *new_attr, void *new_work) |
void | spawn_task (void *new_task, void *new_attr, void *new_group, void *new_work) |
taskmgr (const unsigned int &num_queues, const unsigned int *thds_per_queue, const unsigned int **affinity=NULL) | |
Create all the threads. Make them wait on the task queue. | |
virtual | ~taskmgr () |
void | set_max_attempts (const unsigned int &max_attempts) |
unsigned int | get_max_attempts () const |
template<typename CompletionPredicate , typename TaskPredicate > | |
task * | get_task (const CompletionPredicate &completion_pred, const unsigned int &max_attempts, const unsigned int &queue_number, const TaskPredicate &task_pred) |
void | operator() (void *_my_attr) |
void | progress_wait (event< testable_event > &compl_event) |
void | progress_barrier () |
unsigned int | get_num_queues () const |
unsigned int | get_num_threads () const |
Public Attributes | |
const attribute | default_attribute |
group | default_group |
Private Member Functions | |
PFUNC_DEFINE_EXCEPT_PTR () struct task_completion_predicate | |
Private Attributes | |
const unsigned int | num_queues |
unsigned int | num_threads |
unsigned int * | threads_per_queue |
queue_type * | task_queue |
thread_handle_type * | thread_handles |
thread_attr ** | thread_data |
task * | task_cache |
reroute_function_arg ** | thread_args |
volatile unsigned int | thread_start_count |
thread_attr * | main_thread_attr |
thread | thread_manager |
barrier | start_up_barrier |
aligned_bool * | thread_state |
unsigned int | task_max_attempts |
Main class that implements the tasking aspect.
SchedPolicyName | The scheduling policy to use. |
Task | The type of the task to use. |
This is the main struct that implements the functionality provided in pfunc tool kit. Of the many things implemented in this class, the important functions are: 1. Create X number of threads and Y number of queues. 2. Add a task to a queue. 3. Wait for completions for those added tasks.
typedef task::attribute pfunc::detail::taskmgr< SchedPolicyName, Task >::attribute |
To know the attribute
typedef task::functor pfunc::detail::taskmgr< SchedPolicyName, Task >::functor |
Type of the functor
typedef group_predicate_pair<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::group_predicate |
typedef thread::native_thread_id_type pfunc::detail::taskmgr< SchedPolicyName, Task >::native_thread_id_type |
used for storage
typedef attribute::priority_type pfunc::detail::taskmgr< SchedPolicyName, Task >::priority_type |
To know what priority exit_job
typedef task_queue_set<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::queue_type |
scheduler
typedef regular_predicate_pair<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::regular_predicate |
typedef SchedPolicyName pfunc::detail::taskmgr< SchedPolicyName, Task >::sched_policy_name |
Type of scheduler in use
typedef Task pfunc::detail::taskmgr< SchedPolicyName, Task >::task |
type of task
typedef thread::thread_handle_type pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_handle_type |
typedef waiting_predicate_pair<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::waiting_predicate |
pfunc::detail::taskmgr< SchedPolicyName, Task >::taskmgr | ( | const unsigned int & | num_queues, |
const unsigned int * | thds_per_queue, | ||
const unsigned int ** | affinity = NULL |
||
) | [inline] |
Create all the threads. Make them wait on the task queue.
Constructor
[in] | num_queues | Number of queues to create. |
[in] | thds_per_queue | Number of threads to create per queue. |
[in] | perf_data | Performance metrics to be collected. |
[in] | affinity | The affinity of the threads to the proceseors. |
virtual pfunc::detail::taskmgr< SchedPolicyName, Task >::~taskmgr | ( | ) | [inline, virtual] |
Destructor
Cancel all the threads
Wait for their completion
void pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_group_barrier | ( | ) | [inline, virtual] |
Executes a barrier accross the group of the currently executing task (and hence, thread). Most of the details regarding the barrier are stored with the task -- so we just refer back to the current task that we cache.
Implements pfunc::detail::taskmgr_virtual_base.
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_group_rank | ( | ) | [inline, virtual] |
Returns the rank of the calling task in its group.
We allow each task to get its rank and size from the running environment. This is possible since we always cache each thread's currently executing task.
Implements pfunc::detail::taskmgr_virtual_base.
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_group_size | ( | ) | [inline, virtual] |
Returns the size of the calling task's group.
We allow each task to get its rank and size from the running environment. This is possible since we always cache each thread's currently executing task.
Implements pfunc::detail::taskmgr_virtual_base.
task* pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_information | ( | ) | [inline] |
Returns information regarding the current task being executed.
Every thread stores the current task that it is executing. This means that we can get the handle, group and work attributes associated with every task that a particular thread is executing. Hence, tasks can query this information without needing actually explictly passing any information.
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::current_thread_id | ( | ) | [inline, virtual] |
Returns information regarding the current thread.
Every task is being executed by some thread or the other. This function returns the ID of the thread that is being executed. This is an uint. If the thread querying for information is NOT a PFunc thread, we return -1
Implements pfunc::detail::taskmgr_virtual_base.
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::get_max_attempts | ( | ) | const [inline, virtual] |
Get the maximum number of attempts before backoff -- ~0x0 by default
Implements pfunc::detail::taskmgr_virtual_base.
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::get_num_queues | ( | ) | const [inline, virtual] |
Implements pfunc::detail::taskmgr_virtual_base.
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::get_num_threads | ( | ) | const [inline, virtual] |
Implements pfunc::detail::taskmgr_virtual_base.
task* pfunc::detail::taskmgr< SchedPolicyName, Task >::get_task | ( | const CompletionPredicate & | completion_pred, |
const unsigned int & | max_attempts, | ||
const unsigned int & | queue_number, | ||
const TaskPredicate & | task_pred | ||
) | [inline] |
Function that retrieves a task from the task_queue (preferably from the thread's own) with some amount of regulation builtin. The regulation is that if we cannot find a suitable task for a X number of attempts, we relinquish control of the processor and try back with X/2 attempts. Not quite an exponential backoff, but it does for now.
[in] | completion_pred | A boolean predicate that signals the completion of the waiting. |
[in] | max_attempts | The maximum number of attempts to make. |
[in] | queue_number | The primary queue number for the calling thread. |
[in] | task_pred | The predicate based on which the task is selected. |
Anju: Bug Fix: Earlier, when we ran out of the number of attempts, we would recursively call get_task with half as many attempts. When there is no task to pick from the queue for a long time, this results in a long long recursion stack, eventually leading to explosion. Fixing this by having two loops --- ugly but works. Also, notice that this depends on the completion_pred() being twice testable. I am assuming that this is OK since the originally loop tested the completion predicate multiple times.
void pfunc::detail::taskmgr< SchedPolicyName, Task >::operator() | ( | void * | _my_attr | ) | [inline, virtual] |
Endless loop that the worker threads execute
[in] | _my_attr | thread_attr* cast as a void*. |
This function is invoked at the start of each thread. As such, each thread waits in an endless loop for work. The only piece of information that this thread needs is the JOBQUEUE which it is supposed to wait on. Also, an ID of some form would be nice.
Upon completion of the job, it sets the status of the job to in a manner which can be checked.
time to exit
Implements pfunc::detail::taskmgr_virtual_base.
pfunc::detail::taskmgr< SchedPolicyName, Task >::PFUNC_DEFINE_EXCEPT_PTR | ( | ) | [inline, private] |
Used to wrap around testing_compl.test()
< Completion event
[in,out] | compl_event | A testable completion event reference. |
void pfunc::detail::taskmgr< SchedPolicyName, Task >::progress_barrier | ( | ) | [inline, virtual] |
Picks up a task (non-exit) to execute while waiting on a barrier to complete.
Implements pfunc::detail::taskmgr_virtual_base.
void pfunc::detail::taskmgr< SchedPolicyName, Task >::progress_wait | ( | event< testable_event > & | compl_event | ) | [inline, virtual] |
Picks up and executes other tasks (non-exit) while waiting on another task to complete. The other task's completion is signalled using the input parameter (testable event).
[in] | compl_event | A testable event that signals completion of the task that we are waiting on. |
Create the predicate from the event
We need to handle a non-PFunc task trying to progress PFunc's tasks right here because it does not fly! Typically, users are required to just set "nested=false" in the task's attribute when using main thread. However, for people who do insist on committing this folly, we will, for now, simply keep yielding and testing till the task is complete. There seems to be no other good solution for now
PFunc's thread
Implements pfunc::detail::taskmgr_virtual_base.
void pfunc::detail::taskmgr< SchedPolicyName, Task >::set_max_attempts | ( | const unsigned int & | max_attempts | ) | [inline, virtual] |
Set the maximum number of attempts before backoff -- ~0x0 by default
[in] | max_attempts | The new value of task_max_attempts. |
Implements pfunc::detail::taskmgr_virtual_base.
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task | ( | task & | new_task, |
functor & | new_work | ||
) | [inline] |
spawn_task
This function is used to add a C++-style object to the queue.
[in,out] | new_task | The new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive. |
[in] | new_attr | The attributes that dictate execution of the task. |
[in] | new_group | The group to be associated with the handle. |
[in] | new_work | The function object that represents the code to be executed. |
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task | ( | task & | new_task, |
const attribute & | new_attr, | ||
functor & | new_work | ||
) | [inline] |
spawn_task
This function is used to add a C++-style object to the queue.
[in,out] | new_task | The new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive. |
[in] | new_attr | The attributes that dictate execution of the task. |
[in] | new_group | The group to be associated with the handle. |
[in] | new_work | The function object that represents the code to be executed. |
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task | ( | task & | new_task, |
const attribute & | new_attr, | ||
group & | new_group, | ||
functor & | new_work | ||
) | [inline] |
spawn_task
This function is used to add a C++-style object to the queue.
[in,out] | new_task | The new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive. |
[in] | new_attr | The attributes that dictate execution of the task. |
[in] | new_group | The group to be associated with the handle. |
[in] | new_work | The function object that represents the code to be executed. |
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task | ( | void * | new_task, |
void * | new_work | ||
) | [inline, virtual] |
spawn_task
This function is used to spawn a new task -- only from the virtual base class. So, all the pointers are void* and we have to cast them back into the right type. God save the typesafety in this case.
[in,out] | new_task | The new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive. |
[in] | new_work | The function object that represents the code to be executed. |
Implements pfunc::detail::taskmgr_virtual_base.
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task | ( | void * | new_task, |
void * | new_attr, | ||
void * | new_work | ||
) | [inline, virtual] |
spawn_task
This function is used to spawn a new task -- only from the virtual base class. So, all the pointers are void* and we have to cast them back into the right type. God save the typesafety in this case.
[in,out] | new_task | The new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive. |
[in] | new_attr | The attributes that dictate execution of the task. |
[in] | new_work | The function object that represents the code to be executed. |
Implements pfunc::detail::taskmgr_virtual_base.
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task | ( | void * | new_task, |
void * | new_attr, | ||
void * | new_group, | ||
void * | new_work | ||
) | [inline, virtual] |
spawn_task
This function is used to spawn a new task -- only from the virtual base class. So, all the pointers are void* and we have to cast them back into the right type. God save the typesafety in this case.
[in,out] | new_task | The new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive. |
[in] | new_attr | The attributes that dictate execution of the task. |
[in] | new_group | The group to be associated with the handle. |
[in] | new_work | The function object that represents the code to be executed. |
Implements pfunc::detail::taskmgr_virtual_base.
const attribute pfunc::detail::taskmgr< SchedPolicyName, Task >::default_attribute |
used as default during spawn
group pfunc::detail::taskmgr< SchedPolicyName, Task >::default_group |
used as default during spawn
thread_attr* pfunc::detail::taskmgr< SchedPolicyName, Task >::main_thread_attr [private] |
We will set some defaults for the main thread
const unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::num_queues [private] |
Number of task queues to create
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::num_threads [private] |
Number of work threads to create
barrier pfunc::detail::taskmgr< SchedPolicyName, Task >::start_up_barrier [private] |
Ensures all threads start together
task* pfunc::detail::taskmgr< SchedPolicyName, Task >::task_cache [private] |
Used to extract the closest possible match
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::task_max_attempts [private] |
Number of attempts before backoff
queue_type* pfunc::detail::taskmgr< SchedPolicyName, Task >::task_queue [private] |
reroute_function_arg** pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_args [private] |
Arguments to reroute_function
thread_attr** pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_data [private] |
Startup information for the threads
thread_handle_type* pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_handles [private] |
thread handles
thread pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_manager [private] |
Creates and manages threads
volatile unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_start_count [private] |
Used to ensure all threads start
aligned_bool* pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_state [private] |
Denote thread cancellations
unsigned int* pfunc::detail::taskmgr< SchedPolicyName, Task >::threads_per_queue [private] |
Holds the number of threads waiting on each queue