PFUNC
1.0
|
00001 #ifndef PFUNC_PARALLEL_REDUCE_HPP 00002 #define PFUNC_PARALLEL_REDUCE_HPP 00003 00004 #include <pfunc/pfunc.hpp> 00005 #include <iostream> 00006 00007 namespace pfunc { 00008 00028 template <typename PFuncInstanceType /*type of PFunc instantiated*/, 00029 typename ReduceExecutable/*type of the function*/, 00030 typename SpaceType /*type of the space*/> 00031 struct parallel_reduce : pfunc::virtual_functor { 00032 public: 00033 typedef typename PFuncInstanceType::taskmgr TaskMgrType; 00034 typedef typename PFuncInstanceType::task TaskType; 00035 00036 private: 00037 SpaceType space; 00038 ReduceExecutable& func; 00039 TaskMgrType& taskmgr; 00040 00041 public: 00050 parallel_reduce (SpaceType space, 00051 ReduceExecutable& func, 00052 TaskMgrType& taskmgr) : space(space), 00053 func(func), 00054 taskmgr (taskmgr) {} 00055 00056 void operator() (void) { 00057 if (space.can_split ()) { 00058 // Split into subspaces. 00059 typename SpaceType::subspace_container subspaces = space.split (); 00060 assert (SpaceType::arity>=1); 00061 00062 // Create a vector of tasks for each subspace but the first one. 00063 const int num_tasks = SpaceType::arity-1; 00064 TaskType subspace_tasks [num_tasks]; 00065 parallel_reduce<PFuncInstanceType, ReduceExecutable, SpaceType>* 00066 subspace_parallel_reducers [num_tasks]; 00067 00068 // Split func and create a vector of functors one for each task. 00069 std::vector<ReduceExecutable> split_funcs; 00070 for (int i=0; i<num_tasks; ++i) 00071 split_funcs.push_back (ReduceExecutable(func.split ())); 00072 00073 // Save the first task to execute yourself, but do this last. 00074 typename SpaceType::subspace_container::iterator first=subspaces.begin(); 00075 space = *first++; 00076 00077 // Iterate and launch the tasks 00078 int task_index = 0; 00079 while (first != subspaces.end()) { 00080 subspace_parallel_reducers [task_index] = new 00081 parallel_reduce<PFuncInstanceType, ReduceExecutable, SpaceType> 00082 (*first++, split_funcs[task_index], taskmgr); 00083 pfunc::spawn (taskmgr, // the task manager to use 00084 subspace_tasks[task_index], // task handle 00085 *(subspace_parallel_reducers[task_index])); 00086 // the subspace for this comptn 00087 ++task_index; 00088 } 00089 00090 (*this)(); // executing this loop ourselves. 00091 00092 // Wait for completion of other tasks 00093 pfunc::wait_all (taskmgr, // the task manager to use 00094 subspace_tasks, // beginning 00095 subspace_tasks+num_tasks); // end 00096 00097 // Join everything 00098 for (int i=0; i<num_tasks; ++i) { 00099 func.join (split_funcs[i]); 00100 delete subspace_parallel_reducers[i]; 00101 } 00102 } else { 00103 // No more splitting --- simply invoke the function on the given space. 00104 func (space); 00105 } 00106 } 00107 }; 00108 00109 } // namespace pfunc 00110 00111 #endif // PFUNC_PARALLEL_REDUCE_HPP