PFUNC  1.0
pfunc/prio.hpp
Go to the documentation of this file.
00001 #ifndef PFUNC_PRIO_T_HPP
00002 #define PFUNC_PRIO_T_HPP
00003 
00004 #ifndef PFUNC_SCHEDULER_HPP
00005 #error "This file can only be included from task_queue_set.hpp"
00006 #endif
00007 
00008 #include <queue> 
00009 #include <vector>
00010 #include <pfunc/mutex.hpp>
00011 #include <pfunc/environ.hpp>
00012 #include <pfunc/task.hpp>
00013 #include <pfunc/exception.hpp>
00014 
00015 namespace pfunc { namespace detail {
00020   template <typename T>
00021   struct task_traits {
00022     typedef typename T::attribute attribute; 
00023     typedef typename T::functor functor; 
00024   };
00025 
00030   template <typename Attribute, typename Functor>
00031   struct compare_task_ptr {
00032 
00033     typedef bool result_type; 
00034     typedef task<Attribute, Functor>* first_argument_type; 
00035     typedef first_argument_type second_argument_type ;
00036 
00049     bool operator()(first_argument_type ptr1,
00050                     second_argument_type ptr2) const {
00051       typename Attribute::compare_type comp;
00052       return comp (ptr1->get_attr().get_priority(), 
00053                    ptr2->get_attr().get_priority());
00054     }
00055   };
00056 
00060   template <typename ValueType>
00061   struct task_queue_set <prioS, ValueType> {
00062     typedef typename task_traits<ValueType>::attribute attribute; 
00063     typedef typename task_traits<ValueType>::functor functor; 
00064     typedef compare_task_ptr<attribute, functor> compare_type; 
00065     typedef std::priority_queue<ValueType*, 
00066                                 std::vector<ValueType*>, 
00067                                 compare_type> queue_type; 
00068     typedef typename queue_type::value_type value_type; 
00069     typedef unsigned int queue_index_type; 
00070     typedef task_queue_set_data<queue_type> data_type; 
00072     ALIGN128 data_type* data; 
00073     ALIGN128 unsigned int num_queues; 
00074     PFUNC_DEFINE_EXCEPT_PTR()
00075 
00076     
00081     task_queue_set (unsigned int num_queues) PFUNC_CONSTRUCTOR_TRY_BLOCK() : 
00082       num_queues (num_queues) PFUNC_EXCEPT_PTR_INIT() {
00083       PFUNC_START_TRY_BLOCK()
00084       data = new data_type [num_queues];
00085       PFUNC_END_TRY_BLOCK()
00086       PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set)
00087     }
00088     PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set)
00089 
00090     
00093     ~task_queue_set () { 
00094       PFUNC_START_TRY_BLOCK()
00095       delete [] data; 
00096       PFUNC_EXCEPT_PTR_CLEAR()
00097       PFUNC_END_TRY_BLOCK()
00098       PFUNC_CATCH_AND_RETHROW(task_queue_set,~task_queue_set)
00099     }
00100 
00115     template <typename TaskPredicatePair>
00116     bool test_and_get (queue_index_type queue_num, 
00117                        const TaskPredicatePair& cnd, 
00118                        value_type& value,
00119                        bool own_queue) {
00120       bool ret_val = false;
00121 
00122       PFUNC_START_TRY_BLOCK()
00123       queue_type& queue = data[queue_num].queue;
00124       mutex& lock = data[queue_num].lock;
00125 
00126       lock.lock ();
00127       if (!queue.empty () &&
00128           ((own_queue)?cnd.own_pred(queue.top()):cnd.steal_pred(queue.top()))) {
00129         value = queue.top ();
00130         queue.pop ();
00131         ret_val = true;
00132       }
00133       lock.unlock ();
00134 
00135       PFUNC_END_TRY_BLOCK()
00136       PFUNC_CATCH_AND_RETHROW(task_queue_set,test_and_get)
00137 
00138       return ret_val;
00139     }
00140 
00155     template <typename TaskPredicatePair>
00156     value_type get (queue_index_type queue_num, 
00157                     const TaskPredicatePair& cnd) {
00158       value_type task = NULL;
00159 
00160       PFUNC_START_TRY_BLOCK()
00161       for (int i=queue_num, num_attempts = 0; 
00162            num_attempts < static_cast<int>(num_queues);
00163            ++i, ++num_attempts) {
00164         const unsigned int real_i = i % num_queues;
00165         if (test_and_get (real_i, cnd, task, (real_i==queue_num))) break;
00166       }
00167 
00168       PFUNC_END_TRY_BLOCK()
00169       PFUNC_CATCH_AND_RETHROW(task_queue_set,get)
00170 
00171       return task;
00172     }
00173 
00181     void put (queue_index_type queue_num, const value_type& value) {
00182       PFUNC_START_TRY_BLOCK()
00183       queue_type& queue = data[queue_num].queue;
00184       mutex& lock = data[queue_num].lock;
00185 
00186       lock.lock ();
00187       queue.push (value);
00188       lock.unlock ();
00189 
00190       PFUNC_END_TRY_BLOCK()
00191       PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set)
00192     }
00193   };
00194 } /* namespace detail */ } /* namespace pfunc */
00195 
00196 #endif /* PFUNC_PRIO_T_HPP */