PFUNC  1.0
Classes | Public Types | Public Member Functions | Public Attributes | Private Member Functions | Private Attributes
pfunc::detail::taskmgr< SchedPolicyName, Task > Struct Template Reference

Main class that implements the tasking aspect. More...

#include <pfunc/taskmgr.hpp>

Inheritance diagram for pfunc::detail::taskmgr< SchedPolicyName, Task >:
pfunc::detail::taskmgr_virtual_base

List of all members.

Classes

struct  aligned_bool

Public Types

typedef Task task
typedef task::functor functor
typedef SchedPolicyName sched_policy_name
typedef task_queue_set
< sched_policy_name, task
queue_type
typedef task::attribute attribute
typedef attribute::priority_type priority_type
typedef
thread::native_thread_id_type 
native_thread_id_type
typedef regular_predicate_pair
< sched_policy_name, task
regular_predicate
typedef waiting_predicate_pair
< sched_policy_name, task
waiting_predicate
typedef group_predicate_pair
< sched_policy_name, task
group_predicate
typedef thread::thread_handle_type thread_handle_type

Public Member Functions

unsigned int current_thread_id ()
 Returns information regarding the current thread.
taskcurrent_task_information ()
 Returns information regarding the current task being executed.
unsigned int current_task_group_rank ()
 Returns the rank of the calling task in its group.
unsigned int current_task_group_size ()
 Returns the size of the calling task's group.
void current_task_group_barrier ()
 Executes a barrier accross the group of the currently executing task (and hence, thread). Most of the details regarding the barrier are stored with the task -- so we just refer back to the current task that we cache.
void spawn_task (task &new_task, functor &new_work)
void spawn_task (task &new_task, const attribute &new_attr, functor &new_work)
void spawn_task (task &new_task, const attribute &new_attr, group &new_group, functor &new_work)
void spawn_task (void *new_task, void *new_work)
void spawn_task (void *new_task, void *new_attr, void *new_work)
void spawn_task (void *new_task, void *new_attr, void *new_group, void *new_work)
 taskmgr (const unsigned int &num_queues, const unsigned int *thds_per_queue, const unsigned int **affinity=NULL)
 Create all the threads. Make them wait on the task queue.
virtual ~taskmgr ()
void set_max_attempts (const unsigned int &max_attempts)
unsigned int get_max_attempts () const
template<typename CompletionPredicate , typename TaskPredicate >
taskget_task (const CompletionPredicate &completion_pred, const unsigned int &max_attempts, const unsigned int &queue_number, const TaskPredicate &task_pred)
void operator() (void *_my_attr)
void progress_wait (event< testable_event > &compl_event)
void progress_barrier ()
unsigned int get_num_queues () const
unsigned int get_num_threads () const

Public Attributes

const attribute default_attribute
group default_group

Private Member Functions

 PFUNC_DEFINE_EXCEPT_PTR () struct task_completion_predicate

Private Attributes

const unsigned int num_queues
unsigned int num_threads
unsigned int * threads_per_queue
queue_typetask_queue
thread_handle_typethread_handles
thread_attr ** thread_data
tasktask_cache
reroute_function_arg ** thread_args
volatile unsigned int thread_start_count
thread_attrmain_thread_attr
thread thread_manager
barrier start_up_barrier
aligned_boolthread_state
unsigned int task_max_attempts

Detailed Description

template<typename SchedPolicyName, typename Task>
struct pfunc::detail::taskmgr< SchedPolicyName, Task >

Main class that implements the tasking aspect.

Parameters:
SchedPolicyNameThe scheduling policy to use.
TaskThe type of the task to use.

This is the main struct that implements the functionality provided in pfunc tool kit. Of the many things implemented in this class, the important functions are: 1. Create X number of threads and Y number of queues. 2. Add a task to a queue. 3. Wait for completions for those added tasks.


Member Typedef Documentation

template<typename SchedPolicyName , typename Task >
typedef task::attribute pfunc::detail::taskmgr< SchedPolicyName, Task >::attribute

To know the attribute

template<typename SchedPolicyName , typename Task >
typedef task::functor pfunc::detail::taskmgr< SchedPolicyName, Task >::functor

Type of the functor

template<typename SchedPolicyName , typename Task >
typedef group_predicate_pair<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::group_predicate
template<typename SchedPolicyName , typename Task >
typedef thread::native_thread_id_type pfunc::detail::taskmgr< SchedPolicyName, Task >::native_thread_id_type

used for storage

template<typename SchedPolicyName , typename Task >
typedef attribute::priority_type pfunc::detail::taskmgr< SchedPolicyName, Task >::priority_type

To know what priority exit_job

template<typename SchedPolicyName , typename Task >
typedef task_queue_set<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::queue_type

scheduler

template<typename SchedPolicyName , typename Task >
typedef regular_predicate_pair<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::regular_predicate
template<typename SchedPolicyName , typename Task >
typedef SchedPolicyName pfunc::detail::taskmgr< SchedPolicyName, Task >::sched_policy_name

Type of scheduler in use

template<typename SchedPolicyName , typename Task >
typedef Task pfunc::detail::taskmgr< SchedPolicyName, Task >::task

type of task

template<typename SchedPolicyName , typename Task >
typedef thread::thread_handle_type pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_handle_type
template<typename SchedPolicyName , typename Task >
typedef waiting_predicate_pair<sched_policy_name, task> pfunc::detail::taskmgr< SchedPolicyName, Task >::waiting_predicate

Constructor & Destructor Documentation

template<typename SchedPolicyName , typename Task >
pfunc::detail::taskmgr< SchedPolicyName, Task >::taskmgr ( const unsigned int &  num_queues,
const unsigned int *  thds_per_queue,
const unsigned int **  affinity = NULL 
) [inline]

Create all the threads. Make them wait on the task queue.

Constructor

Parameters:
[in]num_queuesNumber of queues to create.
[in]thds_per_queueNumber of threads to create per queue.
[in]perf_dataPerformance metrics to be collected.
[in]affinityThe affinity of the threads to the proceseors.
template<typename SchedPolicyName , typename Task >
virtual pfunc::detail::taskmgr< SchedPolicyName, Task >::~taskmgr ( ) [inline, virtual]

Destructor

Cancel all the threads

Wait for their completion


Member Function Documentation

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_group_barrier ( ) [inline, virtual]

Executes a barrier accross the group of the currently executing task (and hence, thread). Most of the details regarding the barrier are stored with the task -- so we just refer back to the current task that we cache.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_group_rank ( ) [inline, virtual]

Returns the rank of the calling task in its group.

We allow each task to get its rank and size from the running environment. This is possible since we always cache each thread's currently executing task.

Returns:
Rank of the task being currently executed in its group.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_group_size ( ) [inline, virtual]

Returns the size of the calling task's group.

We allow each task to get its rank and size from the running environment. This is possible since we always cache each thread's currently executing task.

Returns:
Size of the task being currently executed in its group.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
task* pfunc::detail::taskmgr< SchedPolicyName, Task >::current_task_information ( ) [inline]

Returns information regarding the current task being executed.

Every thread stores the current task that it is executing. This means that we can get the handle, group and work attributes associated with every task that a particular thread is executing. Hence, tasks can query this information without needing actually explictly passing any information.

Returns:
Pointer to the task being currently executed.
template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::current_thread_id ( ) [inline, virtual]

Returns information regarding the current thread.

Every task is being executed by some thread or the other. This function returns the ID of the thread that is being executed. This is an uint. If the thread querying for information is NOT a PFunc thread, we return -1

Returns:
ID of the currently executing thread (0 -- NUM_THREADS-1)

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::get_max_attempts ( ) const [inline, virtual]

Get the maximum number of attempts before backoff -- ~0x0 by default

Returns:
The current value of task_max_attempts.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::get_num_queues ( ) const [inline, virtual]
Returns:
the total number of queues created.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::get_num_threads ( ) const [inline, virtual]
Returns:
the total number of threads created.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
template<typename CompletionPredicate , typename TaskPredicate >
task* pfunc::detail::taskmgr< SchedPolicyName, Task >::get_task ( const CompletionPredicate &  completion_pred,
const unsigned int &  max_attempts,
const unsigned int &  queue_number,
const TaskPredicate &  task_pred 
) [inline]

Function that retrieves a task from the task_queue (preferably from the thread's own) with some amount of regulation builtin. The regulation is that if we cannot find a suitable task for a X number of attempts, we relinquish control of the processor and try back with X/2 attempts. Not quite an exponential backoff, but it does for now.

Parameters:
[in]completion_predA boolean predicate that signals the completion of the waiting.
[in]max_attemptsThe maximum number of attempts to make.
[in]queue_numberThe primary queue number for the calling thread.
[in]task_predThe predicate based on which the task is selected.
Returns:
A pointer to the task that needs to be executed.

Anju: Bug Fix: Earlier, when we ran out of the number of attempts, we would recursively call get_task with half as many attempts. When there is no task to pick from the queue for a long time, this results in a long long recursion stack, eventually leading to explosion. Fixing this by having two loops --- ugly but works. Also, notice that this depends on the completion_pred() being twice testable. I am assuming that this is OK since the originally loop tested the completion predicate multiple times.

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::operator() ( void *  _my_attr) [inline, virtual]

Endless loop that the worker threads execute

Parameters:
[in]_my_attrthread_attr* cast as a void*.

This function is invoked at the start of each thread. As such, each thread waits in an endless loop for work. The only piece of information that this thread needs is the JOBQUEUE which it is supposed to wait on. Also, an ID of some form would be nice.

Upon completion of the job, it sets the status of the job to in a manner which can be checked.

time to exit

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
pfunc::detail::taskmgr< SchedPolicyName, Task >::PFUNC_DEFINE_EXCEPT_PTR ( ) [inline, private]

Used to wrap around testing_compl.test()

< Completion event

Parameters:
[in,out]compl_eventA testable completion event reference.
Returns:
true If the task completed.
false If the task has not completed.
template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::progress_barrier ( ) [inline, virtual]

Picks up a task (non-exit) to execute while waiting on a barrier to complete.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::progress_wait ( event< testable_event > &  compl_event) [inline, virtual]

Picks up and executes other tasks (non-exit) while waiting on another task to complete. The other task's completion is signalled using the input parameter (testable event).

Parameters:
[in]compl_eventA testable event that signals completion of the task that we are waiting on.

Create the predicate from the event

We need to handle a non-PFunc task trying to progress PFunc's tasks right here because it does not fly! Typically, users are required to just set "nested=false" in the task's attribute when using main thread. However, for people who do insist on committing this folly, we will, for now, simply keep yielding and testing till the task is complete. There seems to be no other good solution for now

PFunc's thread

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::set_max_attempts ( const unsigned int &  max_attempts) [inline, virtual]

Set the maximum number of attempts before backoff -- ~0x0 by default

Parameters:
[in]max_attemptsThe new value of task_max_attempts.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task ( task new_task,
functor new_work 
) [inline]

spawn_task

This function is used to add a C++-style object to the queue.

Parameters:
[in,out]new_taskThe new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive.
[in]new_attrThe attributes that dictate execution of the task.
[in]new_groupThe group to be associated with the handle.
[in]new_workThe function object that represents the code to be executed.
template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task ( task new_task,
const attribute new_attr,
functor new_work 
) [inline]

spawn_task

This function is used to add a C++-style object to the queue.

Parameters:
[in,out]new_taskThe new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive.
[in]new_attrThe attributes that dictate execution of the task.
[in]new_groupThe group to be associated with the handle.
[in]new_workThe function object that represents the code to be executed.
template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task ( task new_task,
const attribute new_attr,
group new_group,
functor new_work 
) [inline]

spawn_task

This function is used to add a C++-style object to the queue.

Parameters:
[in,out]new_taskThe new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive.
[in]new_attrThe attributes that dictate execution of the task.
[in]new_groupThe group to be associated with the handle.
[in]new_workThe function object that represents the code to be executed.
template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task ( void *  new_task,
void *  new_work 
) [inline, virtual]

spawn_task

This function is used to spawn a new task -- only from the virtual base class. So, all the pointers are void* and we have to cast them back into the right type. God save the typesafety in this case.

Parameters:
[in,out]new_taskThe new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive.
[in]new_workThe function object that represents the code to be executed.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task ( void *  new_task,
void *  new_attr,
void *  new_work 
) [inline, virtual]

spawn_task

This function is used to spawn a new task -- only from the virtual base class. So, all the pointers are void* and we have to cast them back into the right type. God save the typesafety in this case.

Parameters:
[in,out]new_taskThe new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive.
[in]new_attrThe attributes that dictate execution of the task.
[in]new_workThe function object that represents the code to be executed.

Implements pfunc::detail::taskmgr_virtual_base.

template<typename SchedPolicyName , typename Task >
void pfunc::detail::taskmgr< SchedPolicyName, Task >::spawn_task ( void *  new_task,
void *  new_attr,
void *  new_group,
void *  new_work 
) [inline, virtual]

spawn_task

This function is used to spawn a new task -- only from the virtual base class. So, all the pointers are void* and we have to cast them back into the right type. God save the typesafety in this case.

Parameters:
[in,out]new_taskThe new task to be added. new_task must have a lifetime atleast till the wait on this task is complete. No copy is made as it is expensive.
[in]new_attrThe attributes that dictate execution of the task.
[in]new_groupThe group to be associated with the handle.
[in]new_workThe function object that represents the code to be executed.

Implements pfunc::detail::taskmgr_virtual_base.


Member Data Documentation

template<typename SchedPolicyName , typename Task >
const attribute pfunc::detail::taskmgr< SchedPolicyName, Task >::default_attribute

used as default during spawn

template<typename SchedPolicyName , typename Task >
group pfunc::detail::taskmgr< SchedPolicyName, Task >::default_group

used as default during spawn

template<typename SchedPolicyName , typename Task >
thread_attr* pfunc::detail::taskmgr< SchedPolicyName, Task >::main_thread_attr [private]

We will set some defaults for the main thread

template<typename SchedPolicyName , typename Task >
const unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::num_queues [private]

Number of task queues to create

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::num_threads [private]

Number of work threads to create

template<typename SchedPolicyName , typename Task >
barrier pfunc::detail::taskmgr< SchedPolicyName, Task >::start_up_barrier [private]

Ensures all threads start together

template<typename SchedPolicyName , typename Task >
task* pfunc::detail::taskmgr< SchedPolicyName, Task >::task_cache [private]

Used to extract the closest possible match

template<typename SchedPolicyName , typename Task >
unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::task_max_attempts [private]

Number of attempts before backoff

template<typename SchedPolicyName , typename Task >
queue_type* pfunc::detail::taskmgr< SchedPolicyName, Task >::task_queue [private]
template<typename SchedPolicyName , typename Task >
reroute_function_arg** pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_args [private]

Arguments to reroute_function

template<typename SchedPolicyName , typename Task >
thread_attr** pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_data [private]

Startup information for the threads

template<typename SchedPolicyName , typename Task >
thread_handle_type* pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_handles [private]

thread handles

template<typename SchedPolicyName , typename Task >
thread pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_manager [private]

Creates and manages threads

template<typename SchedPolicyName , typename Task >
volatile unsigned int pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_start_count [private]

Used to ensure all threads start

template<typename SchedPolicyName , typename Task >
aligned_bool* pfunc::detail::taskmgr< SchedPolicyName, Task >::thread_state [private]

Denote thread cancellations

template<typename SchedPolicyName , typename Task >
unsigned int* pfunc::detail::taskmgr< SchedPolicyName, Task >::threads_per_queue [private]

Holds the number of threads waiting on each queue


The documentation for this struct was generated from the following file: