PFUNC
1.0
|
00001 #ifndef PFUNC_PRIO_T_HPP 00002 #define PFUNC_PRIO_T_HPP 00003 00004 #ifndef PFUNC_SCHEDULER_HPP 00005 #error "This file can only be included from task_queue_set.hpp" 00006 #endif 00007 00008 #include <queue> 00009 #include <vector> 00010 #include <pfunc/mutex.hpp> 00011 #include <pfunc/environ.hpp> 00012 #include <pfunc/task.hpp> 00013 #include <pfunc/exception.hpp> 00014 00015 namespace pfunc { namespace detail { 00020 template <typename T> 00021 struct task_traits { 00022 typedef typename T::attribute attribute; 00023 typedef typename T::functor functor; 00024 }; 00025 00030 template <typename Attribute, typename Functor> 00031 struct compare_task_ptr { 00032 00033 typedef bool result_type; 00034 typedef task<Attribute, Functor>* first_argument_type; 00035 typedef first_argument_type second_argument_type ; 00036 00049 bool operator()(first_argument_type ptr1, 00050 second_argument_type ptr2) const { 00051 typename Attribute::compare_type comp; 00052 return comp (ptr1->get_attr().get_priority(), 00053 ptr2->get_attr().get_priority()); 00054 } 00055 }; 00056 00060 template <typename ValueType> 00061 struct task_queue_set <prioS, ValueType> { 00062 typedef typename task_traits<ValueType>::attribute attribute; 00063 typedef typename task_traits<ValueType>::functor functor; 00064 typedef compare_task_ptr<attribute, functor> compare_type; 00065 typedef std::priority_queue<ValueType*, 00066 std::vector<ValueType*>, 00067 compare_type> queue_type; 00068 typedef typename queue_type::value_type value_type; 00069 typedef unsigned int queue_index_type; 00070 typedef task_queue_set_data<queue_type> data_type; 00072 ALIGN128 data_type* data; 00073 ALIGN128 unsigned int num_queues; 00074 PFUNC_DEFINE_EXCEPT_PTR() 00075 00076 00081 task_queue_set (unsigned int num_queues) PFUNC_CONSTRUCTOR_TRY_BLOCK() : 00082 num_queues (num_queues) PFUNC_EXCEPT_PTR_INIT() { 00083 PFUNC_START_TRY_BLOCK() 00084 data = new data_type [num_queues]; 00085 PFUNC_END_TRY_BLOCK() 00086 PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set) 00087 } 00088 PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set) 00089 00090 00093 ~task_queue_set () { 00094 PFUNC_START_TRY_BLOCK() 00095 delete [] data; 00096 PFUNC_EXCEPT_PTR_CLEAR() 00097 PFUNC_END_TRY_BLOCK() 00098 PFUNC_CATCH_AND_RETHROW(task_queue_set,~task_queue_set) 00099 } 00100 00115 template <typename TaskPredicatePair> 00116 bool test_and_get (queue_index_type queue_num, 00117 const TaskPredicatePair& cnd, 00118 value_type& value, 00119 bool own_queue) { 00120 bool ret_val = false; 00121 00122 PFUNC_START_TRY_BLOCK() 00123 queue_type& queue = data[queue_num].queue; 00124 mutex& lock = data[queue_num].lock; 00125 00126 lock.lock (); 00127 if (!queue.empty () && 00128 ((own_queue)?cnd.own_pred(queue.top()):cnd.steal_pred(queue.top()))) { 00129 value = queue.top (); 00130 queue.pop (); 00131 ret_val = true; 00132 } 00133 lock.unlock (); 00134 00135 PFUNC_END_TRY_BLOCK() 00136 PFUNC_CATCH_AND_RETHROW(task_queue_set,test_and_get) 00137 00138 return ret_val; 00139 } 00140 00155 template <typename TaskPredicatePair> 00156 value_type get (queue_index_type queue_num, 00157 const TaskPredicatePair& cnd) { 00158 value_type task = NULL; 00159 00160 PFUNC_START_TRY_BLOCK() 00161 for (int i=queue_num, num_attempts = 0; 00162 num_attempts < static_cast<int>(num_queues); 00163 ++i, ++num_attempts) { 00164 const unsigned int real_i = i % num_queues; 00165 if (test_and_get (real_i, cnd, task, (real_i==queue_num))) break; 00166 } 00167 00168 PFUNC_END_TRY_BLOCK() 00169 PFUNC_CATCH_AND_RETHROW(task_queue_set,get) 00170 00171 return task; 00172 } 00173 00181 void put (queue_index_type queue_num, const value_type& value) { 00182 PFUNC_START_TRY_BLOCK() 00183 queue_type& queue = data[queue_num].queue; 00184 mutex& lock = data[queue_num].lock; 00185 00186 lock.lock (); 00187 queue.push (value); 00188 lock.unlock (); 00189 00190 PFUNC_END_TRY_BLOCK() 00191 PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set) 00192 } 00193 }; 00194 } /* namespace detail */ } /* namespace pfunc */ 00195 00196 #endif /* PFUNC_PRIO_T_HPP */