PFUNC  1.0
pfunc/parallel_reduce.hpp
Go to the documentation of this file.
00001 #ifndef PFUNC_PARALLEL_REDUCE_HPP
00002 #define PFUNC_PARALLEL_REDUCE_HPP
00003 
00004 #include <pfunc/pfunc.hpp>
00005 #include <iostream>
00006 
00007 namespace pfunc {
00008 
00028 template <typename PFuncInstanceType /*type of PFunc instantiated*/,
00029           typename ReduceExecutable/*type of the function*/,
00030           typename SpaceType /*type of the space*/>
00031 struct parallel_reduce : pfunc::virtual_functor {
00032   public:
00033   typedef typename PFuncInstanceType::taskmgr TaskMgrType;
00034   typedef typename PFuncInstanceType::task TaskType;
00035 
00036   private:
00037   SpaceType space; 
00038   ReduceExecutable& func;
00039   TaskMgrType& taskmgr;
00040 
00041   public:
00050   parallel_reduce (SpaceType space, 
00051                    ReduceExecutable& func,
00052                    TaskMgrType& taskmgr) : space(space), 
00053                                            func(func), 
00054                                            taskmgr (taskmgr) {}
00055 
00056   void operator() (void) {
00057     if (space.can_split ()) {
00058       // Split into subspaces.
00059       typename SpaceType::subspace_container subspaces = space.split ();
00060       assert (SpaceType::arity>=1);
00061 
00062       // Create a vector of tasks for each subspace but the first one.
00063       const int num_tasks = SpaceType::arity-1;
00064       TaskType subspace_tasks [num_tasks];
00065       parallel_reduce<PFuncInstanceType, ReduceExecutable, SpaceType>*
00066         subspace_parallel_reducers [num_tasks];
00067 
00068       // Split func and create a vector of functors one for each task.
00069       std::vector<ReduceExecutable> split_funcs;
00070       for (int i=0; i<num_tasks; ++i) 
00071         split_funcs.push_back (ReduceExecutable(func.split ()));
00072 
00073       // Save the first task to execute yourself, but do this last.
00074       typename SpaceType::subspace_container::iterator first=subspaces.begin();
00075       space = *first++;
00076 
00077       // Iterate and launch the tasks
00078       int task_index = 0;
00079       while (first != subspaces.end()) {
00080         subspace_parallel_reducers [task_index] = new 
00081           parallel_reduce<PFuncInstanceType, ReduceExecutable, SpaceType> 
00082             (*first++, split_funcs[task_index], taskmgr);
00083         pfunc::spawn (taskmgr, // the task manager to use
00084                       subspace_tasks[task_index], // task handle
00085                       *(subspace_parallel_reducers[task_index])); 
00086                         // the subspace for this comptn
00087         ++task_index;
00088       }
00089 
00090       (*this)(); // executing this loop ourselves.
00091 
00092       // Wait for completion of other tasks
00093       pfunc::wait_all (taskmgr, // the task manager to use
00094                        subspace_tasks, // beginning
00095                        subspace_tasks+num_tasks); // end
00096 
00097       // Join everything
00098       for (int i=0; i<num_tasks; ++i) { 
00099         func.join (split_funcs[i]);
00100         delete subspace_parallel_reducers[i];
00101       }
00102     } else {
00103       // No more splitting --- simply invoke the function on the given space.
00104       func (space);
00105     }
00106   }
00107 };
00108 
00109 } // namespace pfunc
00110 
00111 #endif // PFUNC_PARALLEL_REDUCE_HPP