PFUNC 1.0
|
00001 #ifndef PFUNC_FIFO_HPP 00002 #define PFUNC_FIFO_HPP 00003 00004 #ifndef PFUNC_SCHEDULER_HPP 00005 #error "This file can only be included from task_queue_set.hpp" 00006 #endif 00007 00008 #include <queue> 00009 #include <pfunc/mutex.hpp> 00010 #include <pfunc/environ.hpp> 00011 #include <pfunc/exception.hpp> 00012 00013 namespace pfunc { namespace detail { 00014 00018 template <typename ValueType> 00019 struct task_queue_set <fifoS, ValueType> { 00020 typedef std::queue<ValueType*> queue_type; 00021 typedef typename queue_type::value_type value_type; 00022 typedef unsigned int queue_index_type; 00023 typedef task_queue_set_data<queue_type> data_type; 00025 ALIGN128 data_type* data; 00026 ALIGN128 unsigned int num_queues; 00027 PFUNC_DEFINE_EXCEPT_PTR() 00028 00029 00034 task_queue_set (unsigned int num_queues) PFUNC_CONSTRUCTOR_TRY_BLOCK(): 00035 num_queues (num_queues) PFUNC_EXCEPT_PTR_INIT() { 00036 PFUNC_START_TRY_BLOCK() 00037 data = new data_type [num_queues]; 00038 PFUNC_END_TRY_BLOCK() 00039 PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set) 00040 } 00041 PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set) 00042 00043 00046 ~task_queue_set ( ) { 00047 PFUNC_START_TRY_BLOCK() 00048 delete [] data; 00049 PFUNC_EXCEPT_PTR_CLEAR() 00050 PFUNC_END_TRY_BLOCK() 00051 PFUNC_CATCH_AND_RETHROW(task_queue_set,~task_queue_set) 00052 } 00053 00068 template <typename TaskPredicatePair> 00069 bool test_and_get (queue_index_type queue_num, 00070 const TaskPredicatePair& cnd, 00071 value_type& value, 00072 bool own_queue) { 00073 bool ret_val = false; 00074 00075 PFUNC_START_TRY_BLOCK() 00076 queue_type& queue = data[queue_num].queue; 00077 mutex& lock = data[queue_num].lock; 00078 00079 lock.lock (); 00080 if (!queue.empty () && 00081 ((own_queue)?cnd.own_pred(queue.front()):cnd.steal_pred(queue.front()))) { 00082 value = queue.front (); 00083 queue.pop (); 00084 ret_val = true; 00085 } 00086 lock.unlock (); 00087 00088 PFUNC_END_TRY_BLOCK() 00089 PFUNC_CATCH_AND_RETHROW(scheduling,test_and_get) 00090 00091 return ret_val; 00092 } 00093 00106 template <typename TaskPredicatePair> 00107 value_type get (queue_index_type queue_num, 00108 const TaskPredicatePair& cnd) { 00109 value_type task = NULL; 00110 00111 PFUNC_START_TRY_BLOCK() 00112 for (int i=queue_num, num_attempts = 0; 00113 num_attempts < static_cast<int>(num_queues); 00114 ++i, ++num_attempts) { 00115 const unsigned int real_i = i % num_queues; 00116 if (test_and_get (real_i, cnd, task, (real_i==queue_num))) break; 00117 } 00118 00119 PFUNC_END_TRY_BLOCK() 00120 PFUNC_CATCH_AND_RETHROW(task_queue_set,get) 00121 00122 return task; 00123 } 00124 00132 void put (queue_index_type queue_num, const value_type& value) { 00133 PFUNC_START_TRY_BLOCK() 00134 queue_type& queue = data[queue_num].queue; 00135 mutex& lock = data[queue_num].lock; 00136 00137 lock.lock (); 00138 queue.push (value); 00139 lock.unlock (); 00140 00141 PFUNC_END_TRY_BLOCK() 00142 PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set) 00143 } 00144 }; 00145 } /* namespace detail */ } /* namespace pfunc */ 00146 00147 #endif /* PFUNC_FIFO_HPP */