PFUNC  1.0
pfunc/fifo.hpp
Go to the documentation of this file.
00001 #ifndef PFUNC_FIFO_HPP
00002 #define PFUNC_FIFO_HPP
00003 
00004 #ifndef PFUNC_SCHEDULER_HPP
00005 #error "This file can only be included from task_queue_set.hpp"
00006 #endif
00007 
00008 #include <queue> 
00009 #include <pfunc/mutex.hpp>
00010 #include <pfunc/environ.hpp>
00011 #include <pfunc/exception.hpp>
00012 
00013 namespace pfunc { namespace detail {
00014 
00018   template <typename ValueType>
00019   struct task_queue_set <fifoS, ValueType> {
00020     typedef std::queue<ValueType*> queue_type; 
00021     typedef typename queue_type::value_type value_type; 
00022     typedef unsigned int queue_index_type; 
00023     typedef task_queue_set_data<queue_type> data_type; 
00025     ALIGN128 data_type* data; 
00026     ALIGN128 unsigned int num_queues; 
00027     PFUNC_DEFINE_EXCEPT_PTR()
00028 
00029     
00034     task_queue_set (unsigned int num_queues) PFUNC_CONSTRUCTOR_TRY_BLOCK(): 
00035       num_queues (num_queues) PFUNC_EXCEPT_PTR_INIT() {
00036       PFUNC_START_TRY_BLOCK()
00037       data = new data_type [num_queues];
00038       PFUNC_END_TRY_BLOCK()
00039       PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set)
00040     }
00041     PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set)
00042 
00043     
00046     ~task_queue_set ( ) { 
00047       PFUNC_START_TRY_BLOCK()
00048       delete [] data; 
00049       PFUNC_EXCEPT_PTR_CLEAR()
00050       PFUNC_END_TRY_BLOCK()
00051       PFUNC_CATCH_AND_RETHROW(task_queue_set,~task_queue_set)
00052     }
00053 
00068     template <typename TaskPredicatePair>
00069     bool test_and_get (queue_index_type queue_num, 
00070                        const TaskPredicatePair& cnd, 
00071                        value_type& value,
00072                        bool own_queue) {
00073       bool ret_val = false;
00074 
00075       PFUNC_START_TRY_BLOCK()
00076       queue_type& queue = data[queue_num].queue;
00077       mutex& lock = data[queue_num].lock;
00078 
00079       lock.lock ();
00080       if (!queue.empty () && 
00081           ((own_queue)?cnd.own_pred(queue.front()):cnd.steal_pred(queue.front()))) {
00082         value = queue.front ();
00083         queue.pop ();
00084         ret_val = true;
00085       }
00086       lock.unlock ();
00087 
00088       PFUNC_END_TRY_BLOCK()
00089       PFUNC_CATCH_AND_RETHROW(scheduling,test_and_get)
00090 
00091       return ret_val;
00092     }
00093 
00106     template <typename TaskPredicatePair>
00107     value_type get (queue_index_type queue_num, 
00108                     const TaskPredicatePair& cnd) {
00109       value_type task = NULL;
00110 
00111       PFUNC_START_TRY_BLOCK()
00112       for (int i=queue_num, num_attempts = 0; 
00113            num_attempts < static_cast<int>(num_queues);
00114            ++i, ++num_attempts) {
00115         const unsigned int real_i = i % num_queues;
00116         if (test_and_get (real_i, cnd, task, (real_i==queue_num))) break;
00117       }
00118 
00119       PFUNC_END_TRY_BLOCK()
00120       PFUNC_CATCH_AND_RETHROW(task_queue_set,get)
00121 
00122       return task;
00123     }
00124 
00132     void put (queue_index_type queue_num, const value_type& value) {
00133       PFUNC_START_TRY_BLOCK()
00134       queue_type& queue = data[queue_num].queue;
00135       mutex& lock = data[queue_num].lock;
00136 
00137       lock.lock ();
00138       queue.push (value);
00139       lock.unlock ();
00140 
00141       PFUNC_END_TRY_BLOCK()
00142       PFUNC_CATCH_AND_RETHROW(task_queue_set,task_queue_set)
00143     }
00144   };
00145 } /* namespace detail */ } /* namespace pfunc */
00146 
00147 #endif /* PFUNC_FIFO_HPP */