00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033
00034
00035 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00036 #define TBB_PREVIEW_TUPLE 1
00037 #include "compat/tuple"
00038 #else
00039 #include <tuple>
00040 #endif
00041
00042 #include<list>
00043 #include<queue>
00044
00055 namespace tbb {
00056 namespace flow {
00057
00059 enum concurrency { unlimited = 0, serial = 1 };
00060
00061 namespace interface6 {
00062
00064 class continue_msg {};
00065
00066 template< typename T > class sender;
00067 template< typename T > class receiver;
00068 class continue_receiver;
00069
00071 template< typename T >
00072 class sender {
00073 public:
00075 typedef T output_type;
00076
00078 typedef receiver<T> successor_type;
00079
00080 virtual ~sender() {}
00081
00083 virtual bool register_successor( successor_type &r ) = 0;
00084
00086 virtual bool remove_successor( successor_type &r ) = 0;
00087
00089 virtual bool try_get( T & ) { return false; }
00090
00092 virtual bool try_reserve( T & ) { return false; }
00093
00095 virtual bool try_release( ) { return false; }
00096
00098 virtual bool try_consume( ) { return false; }
00099 };
00100
00102 template< typename T >
00103 class receiver {
00104 public:
00106 typedef T input_type;
00107
00109 typedef sender<T> predecessor_type;
00110
00112 virtual ~receiver() {}
00113
00115 virtual bool try_put( const T& t ) = 0;
00116
00118 virtual bool register_predecessor( predecessor_type & ) { return false; }
00119
00121 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00122 };
00123
00125
00126 class continue_receiver : public receiver< continue_msg > {
00127 public:
00128
00130 typedef continue_msg input_type;
00131
00133 typedef sender< continue_msg > predecessor_type;
00134
00136 continue_receiver( int number_of_predecessors = 0 ) {
00137 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00138 my_current_count = 0;
00139 }
00140
00142 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00143 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00144 my_current_count = 0;
00145 }
00146
00148 virtual ~continue_receiver() { }
00149
00151 bool register_predecessor( predecessor_type & ) {
00152 spin_mutex::scoped_lock l(my_mutex);
00153 ++my_predecessor_count;
00154 return true;
00155 }
00156
00158
00161 bool remove_predecessor( predecessor_type & ) {
00162 spin_mutex::scoped_lock l(my_mutex);
00163 --my_predecessor_count;
00164 return true;
00165 }
00166
00168
00170 bool try_put( const input_type & ) {
00171 {
00172 spin_mutex::scoped_lock l(my_mutex);
00173 if ( ++my_current_count < my_predecessor_count )
00174 return true;
00175 else
00176 my_current_count = 0;
00177 }
00178 execute();
00179 return true;
00180 }
00181
00182 protected:
00183 spin_mutex my_mutex;
00184 int my_predecessor_count;
00185 int my_current_count;
00186 int my_initial_predecessor_count;
00187
00189
00191 virtual void execute() = 0;
00192 };
00193
00194 #include "internal/_flow_graph_impl.h"
00195 using namespace internal::graph_policy_namespace;
00196
00197 class graph;
00198 class graph_node;
00199
00200 template <typename GraphContainerType, typename GraphNodeType>
00201 class graph_iterator {
00202 friend class graph;
00203 friend class graph_node;
00204 public:
00205 typedef size_t size_type;
00206 typedef GraphNodeType value_type;
00207 typedef GraphNodeType* pointer;
00208 typedef GraphNodeType& reference;
00209 typedef const GraphNodeType& const_reference;
00210 typedef std::forward_iterator_tag iterator_category;
00211
00213 graph_iterator() : my_graph(NULL), current_node(NULL) {}
00214
00216 graph_iterator(const graph_iterator& other) :
00217 my_graph(other.my_graph), current_node(other.current_node)
00218 {}
00219
00221 graph_iterator& operator=(const graph_iterator& other) {
00222 if (this != &other) {
00223 my_graph = other.my_graph;
00224 current_node = other.current_node;
00225 }
00226 return *this;
00227 }
00228
00230 reference operator*() const;
00231
00233 pointer operator->() const;
00234
00236 bool operator==(const graph_iterator& other) const {
00237 return ((my_graph == other.my_graph) && (current_node == other.current_node));
00238 }
00239
00241 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
00242
00244 graph_iterator& operator++() {
00245 internal_forward();
00246 return *this;
00247 }
00248
00250 graph_iterator operator++(int) {
00251 graph_iterator result = *this;
00252 operator++();
00253 return result;
00254 }
00255
00256 private:
00257
00258 GraphContainerType *my_graph;
00259
00260 pointer current_node;
00261
00263 graph_iterator(GraphContainerType *g, bool begin);
00264 void internal_forward();
00265 };
00266
00268
00269 class graph : tbb::internal::no_copy {
00270 friend class graph_node;
00271
00272 template< typename Body >
00273 class run_task : public task {
00274 public:
00275 run_task( Body& body ) : my_body(body) {}
00276 task *execute() {
00277 my_body();
00278 return NULL;
00279 }
00280 private:
00281 Body my_body;
00282 };
00283
00284 template< typename Receiver, typename Body >
00285 class run_and_put_task : public task {
00286 public:
00287 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00288 task *execute() {
00289 my_receiver.try_put( my_body() );
00290 return NULL;
00291 }
00292 private:
00293 Receiver &my_receiver;
00294 Body my_body;
00295 };
00296
00297 public:
00299 explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
00300 {
00301 own_context = true;
00302 cancelled = false;
00303 caught_exception = false;
00304 my_context = new task_group_context();
00305 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00306 my_root_task->set_ref_count(1);
00307 }
00308
00310 explicit graph(task_group_context& use_this_context) :
00311 my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
00312 {
00313 own_context = false;
00314 my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00315 my_root_task->set_ref_count(1);
00316 }
00317
00319
00320 ~graph() {
00321 wait_for_all();
00322 my_root_task->set_ref_count(0);
00323 task::destroy( *my_root_task );
00324 if (own_context) delete my_context;
00325 }
00326
00328
00330 void increment_wait_count() {
00331 if (my_root_task)
00332 my_root_task->increment_ref_count();
00333 }
00334
00336
00338 void decrement_wait_count() {
00339 if (my_root_task)
00340 my_root_task->decrement_ref_count();
00341 }
00342
00344
00346 template< typename Receiver, typename Body >
00347 void run( Receiver &r, Body body ) {
00348 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00349 run_and_put_task< Receiver, Body >( r, body ) );
00350 }
00351
00353
00355 template< typename Body >
00356 void run( Body body ) {
00357 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00358 run_task< Body >( body ) );
00359 }
00360
00362
00363 void wait_for_all() {
00364 cancelled = false;
00365 caught_exception = false;
00366 if (my_root_task) {
00367 #if TBB_USE_EXCEPTIONS
00368 try {
00369 #endif
00370 my_root_task->wait_for_all();
00371 cancelled = my_context->is_group_execution_cancelled();
00372 #if TBB_USE_EXCEPTIONS
00373 }
00374 catch(...) {
00375 my_root_task->set_ref_count(1);
00376 my_context->reset();
00377 caught_exception = true;
00378 cancelled = true;
00379 throw;
00380 }
00381 #endif
00382 my_root_task->set_ref_count(1);
00383 }
00384 }
00385
00387 task * root_task() {
00388 return my_root_task;
00389 }
00390
00391
00392 template<typename C, typename N>
00393 friend class graph_iterator;
00394
00395
00396 typedef graph_iterator<graph,graph_node> iterator;
00397 typedef graph_iterator<const graph,const graph_node> const_iterator;
00398
00399
00401 iterator begin() { return iterator(this, true); }
00403 iterator end() { return iterator(this, false); }
00405 const_iterator begin() const { return const_iterator(this, true); }
00407 const_iterator end() const { return const_iterator(this, false); }
00409 const_iterator cbegin() const { return const_iterator(this, true); }
00411 const_iterator cend() const { return const_iterator(this, false); }
00412
00414 bool is_cancelled() { return cancelled; }
00415 bool exception_thrown() { return caught_exception; }
00416
00417 private:
00418 task *my_root_task;
00419 task_group_context *my_context;
00420 bool own_context;
00421 bool cancelled;
00422 bool caught_exception;
00423
00424 graph_node *my_nodes, *my_nodes_last;
00425
00426 spin_mutex nodelist_mutex;
00427 void register_node(graph_node *n);
00428 void remove_node(graph_node *n);
00429
00430 };
00431
00432 template <typename C, typename N>
00433 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
00434 {
00435 if (begin) current_node = my_graph->my_nodes;
00436
00437 }
00438
00439 template <typename C, typename N>
00440 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
00441 __TBB_ASSERT(current_node, "graph_iterator at end");
00442 return *operator->();
00443 }
00444
00445 template <typename C, typename N>
00446 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
00447 return current_node;
00448 }
00449
00450
00451 template <typename C, typename N>
00452 void graph_iterator<C,N>::internal_forward() {
00453 if (current_node) current_node = current_node->next;
00454 }
00455
00457 class graph_node : tbb::internal::no_assign {
00458 friend class graph;
00459 template<typename C, typename N>
00460 friend class graph_iterator;
00461 protected:
00462 graph& my_graph;
00463 graph_node *next, *prev;
00464 public:
00465 graph_node(graph& g) : my_graph(g) {
00466 my_graph.register_node(this);
00467 }
00468 virtual ~graph_node() {
00469 my_graph.remove_node(this);
00470 }
00471 };
00472
00473 void graph::register_node(graph_node *n) {
00474 n->next = NULL;
00475 {
00476 spin_mutex::scoped_lock lock(nodelist_mutex);
00477 n->prev = my_nodes_last;
00478 if (my_nodes_last) my_nodes_last->next = n;
00479 my_nodes_last = n;
00480 if (!my_nodes) my_nodes = n;
00481 }
00482 }
00483
00484 void graph::remove_node(graph_node *n) {
00485 {
00486 spin_mutex::scoped_lock lock(nodelist_mutex);
00487 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
00488 if (n->prev) n->prev->next = n->next;
00489 if (n->next) n->next->prev = n->prev;
00490 if (my_nodes_last == n) my_nodes_last = n->prev;
00491 if (my_nodes == n) my_nodes = n->next;
00492 }
00493 n->prev = n->next = NULL;
00494 }
00495
00496 #include "internal/_flow_graph_node_impl.h"
00497
00499 template < typename Output >
00500 class source_node : public graph_node, public sender< Output > {
00501 using graph_node::my_graph;
00502 public:
00504 typedef Output output_type;
00505
00507 typedef receiver< Output > successor_type;
00508
00510 template< typename Body >
00511 source_node( graph &g, Body body, bool is_active = true )
00512 : graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00513 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00514 my_reserved(false), my_has_cached_item(false)
00515 {
00516 my_successors.set_owner(this);
00517 }
00518
00520 source_node( const source_node& src ) :
00521 graph_node(src.my_graph), sender<Output>(),
00522 my_root_task( src.my_root_task), my_active(src.init_my_active),
00523 init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00524 my_reserved(false), my_has_cached_item(false)
00525 {
00526 my_successors.set_owner(this);
00527 }
00528
00530 ~source_node() { delete my_body; }
00531
00533 bool register_successor( receiver<output_type> &r ) {
00534 spin_mutex::scoped_lock lock(my_mutex);
00535 my_successors.register_successor(r);
00536 if ( my_active )
00537 spawn_put();
00538 return true;
00539 }
00540
00542 bool remove_successor( receiver<output_type> &r ) {
00543 spin_mutex::scoped_lock lock(my_mutex);
00544 my_successors.remove_successor(r);
00545 return true;
00546 }
00547
00549 bool try_get( output_type &v ) {
00550 spin_mutex::scoped_lock lock(my_mutex);
00551 if ( my_reserved )
00552 return false;
00553
00554 if ( my_has_cached_item ) {
00555 v = my_cached_item;
00556 my_has_cached_item = false;
00557 } else if ( (*my_body)(v) == false ) {
00558 return false;
00559 }
00560 return true;
00561 }
00562
00564 bool try_reserve( output_type &v ) {
00565 spin_mutex::scoped_lock lock(my_mutex);
00566 if ( my_reserved ) {
00567 return false;
00568 }
00569
00570 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00571 my_has_cached_item = true;
00572
00573 if ( my_has_cached_item ) {
00574 v = my_cached_item;
00575 my_reserved = true;
00576 return true;
00577 } else {
00578 return false;
00579 }
00580 }
00581
00583
00584 bool try_release( ) {
00585 spin_mutex::scoped_lock lock(my_mutex);
00586 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00587 my_reserved = false;
00588 if(!my_successors.empty())
00589 spawn_put();
00590 return true;
00591 }
00592
00594 bool try_consume( ) {
00595 spin_mutex::scoped_lock lock(my_mutex);
00596 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00597 my_reserved = false;
00598 my_has_cached_item = false;
00599 if ( !my_successors.empty() ) {
00600 spawn_put();
00601 }
00602 return true;
00603 }
00604
00606 void activate() {
00607 spin_mutex::scoped_lock lock(my_mutex);
00608 my_active = true;
00609 if ( !my_successors.empty() )
00610 spawn_put();
00611 }
00612
00613 private:
00614 task *my_root_task;
00615 spin_mutex my_mutex;
00616 bool my_active;
00617 bool init_my_active;
00618 internal::source_body<output_type> *my_body;
00619 internal::broadcast_cache< output_type > my_successors;
00620 bool my_reserved;
00621 bool my_has_cached_item;
00622 output_type my_cached_item;
00623
00624 friend class internal::source_task< source_node< output_type > >;
00625
00627 void apply_body( ) {
00628 output_type v;
00629 if ( try_reserve(v) == false )
00630 return;
00631
00632 if ( my_successors.try_put( v ) )
00633 try_consume();
00634 else
00635 try_release();
00636 }
00637
00639 void spawn_put( ) {
00640 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00641 internal::source_task< source_node< output_type > >( *this ) );
00642 }
00643 };
00644
00646 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00647 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00648 using graph_node::my_graph;
00649 public:
00650 typedef Input input_type;
00651 typedef Output output_type;
00652 typedef sender< input_type > predecessor_type;
00653 typedef receiver< output_type > successor_type;
00654 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00655 typedef internal::function_output<output_type> fOutput_type;
00656
00658 template< typename Body >
00659 function_node( graph &g, size_t concurrency, Body body ) :
00660 graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body)
00661 {}
00662
00664 function_node( const function_node& src ) :
00665 graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
00666 fOutput_type()
00667 {}
00668
00669 bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00670
00671 protected:
00672 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00673 };
00674
00676 template < typename Input, typename Output, typename Allocator >
00677 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00678 using graph_node::my_graph;
00679 public:
00680 typedef Input input_type;
00681 typedef Output output_type;
00682 typedef sender< input_type > predecessor_type;
00683 typedef receiver< output_type > successor_type;
00684 typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00685 typedef internal::function_input_queue<input_type, Allocator> queue_type;
00686 typedef internal::function_output<output_type> fOutput_type;
00687
00689 template< typename Body >
00690 function_node( graph &g, size_t concurrency, Body body ) :
00691 graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
00692 {}
00693
00695 function_node( const function_node& src ) :
00696 graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOutput_type()
00697 {}
00698
00699 bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00700
00701 protected:
00702 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00703 };
00704
00705 #include "tbb/internal/_flow_graph_types_impl.h"
00706
00708
00709 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00710 class multifunction_node :
00711 public graph_node,
00712 public internal::multifunction_input
00713 <
00714 Input,
00715 typename internal::wrap_tuple_elements<
00716 std::tuple_size<Output>::value,
00717 internal::function_output,
00718 Output
00719 >::type,
00720 Allocator
00721 > {
00722 using graph_node::my_graph;
00723 private:
00724 static const int N = std::tuple_size<Output>::value;
00725 public:
00726 typedef Input input_type;
00727 typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type output_ports_type;
00728 private:
00729 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00730 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00731 public:
00732 template<typename Body>
00733 multifunction_node( graph &g, size_t concurrency, Body body ) :
00734 graph_node(g), base_type(g,concurrency, body)
00735 {}
00736 multifunction_node( const multifunction_node &other) :
00737 graph_node(other.my_graph), base_type(other)
00738 {}
00739
00740 };
00741
00742 template < typename Input, typename Output, typename Allocator >
00743 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
00744 typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00745 using graph_node::my_graph;
00746 static const int N = std::tuple_size<Output>::value;
00747 public:
00748 typedef Input input_type;
00749 typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type output_ports_type;
00750 private:
00751 typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00752 typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00753 public:
00754 template<typename Body>
00755 multifunction_node( graph &g, size_t concurrency, Body body) :
00756 graph_node(g), base_type(g,concurrency, body, new queue_type())
00757 {}
00758 multifunction_node( const multifunction_node &other) :
00759 graph_node(other.my_graph), base_type(other, new queue_type())
00760 {}
00761 };
00762
00764
00765
00766 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00767 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
00768 static const int N = std::tuple_size<TupleType>::value;
00769 typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
00770 public:
00771 typedef typename base_type::output_ports_type output_ports_type;
00772 private:
00773 struct splitting_body {
00774 void operator()(const TupleType& t, output_ports_type &p) {
00775 internal::emit_element<N>::emit_this(t, p);
00776 }
00777 };
00778 public:
00779 typedef TupleType input_type;
00780 typedef Allocator allocator_type;
00781 split_node(graph &g) : base_type(g, unlimited, splitting_body()) {}
00782 split_node( const split_node & other) : base_type(other) {}
00783 };
00784
00786 template <typename Output>
00787 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00788 using graph_node::my_graph;
00789 public:
00790 typedef continue_msg input_type;
00791 typedef Output output_type;
00792 typedef sender< input_type > predecessor_type;
00793 typedef receiver< output_type > successor_type;
00794 typedef internal::function_output<output_type> fOutput_type;
00795
00797 template <typename Body >
00798 continue_node( graph &g, Body body ) :
00799 graph_node(g), internal::continue_input<output_type>( g, body )
00800 {}
00801
00803 template <typename Body >
00804 continue_node( graph &g, int number_of_predecessors, Body body ) :
00805 graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body )
00806 {}
00807
00809 continue_node( const continue_node& src ) :
00810 graph_node(src.my_graph), internal::continue_input<output_type>(src),
00811 internal::function_output<Output>()
00812 {}
00813
00814 bool try_put(const input_type &i) { return internal::continue_input<Output>::try_put(i); }
00815
00816 protected:
00817 internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00818 };
00819
00820 template< typename T >
00821 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00822 using graph_node::my_graph;
00823 public:
00824 typedef T input_type;
00825 typedef T output_type;
00826 typedef sender< input_type > predecessor_type;
00827 typedef receiver< output_type > successor_type;
00828
00829 overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
00830 my_successors.set_owner( this );
00831 }
00832
00833
00834 overwrite_node( const overwrite_node& src ) :
00835 graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
00836 {
00837 my_successors.set_owner( this );
00838 }
00839
00840 ~overwrite_node() {}
00841
00842 bool register_successor( successor_type &s ) {
00843 spin_mutex::scoped_lock l( my_mutex );
00844 if ( my_buffer_is_valid ) {
00845
00846 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
00847
00848 my_successors.register_successor( s );
00849 return true;
00850 } else {
00851
00852 return false;
00853 }
00854 } else {
00855
00856 my_successors.register_successor( s );
00857 return true;
00858 }
00859 }
00860
00861 bool remove_successor( successor_type &s ) {
00862 spin_mutex::scoped_lock l( my_mutex );
00863 my_successors.remove_successor(s);
00864 return true;
00865 }
00866
00867 bool try_put( const T &v ) {
00868 spin_mutex::scoped_lock l( my_mutex );
00869 my_buffer = v;
00870 my_buffer_is_valid = true;
00871 my_successors.try_put(v);
00872 return true;
00873 }
00874
00875 bool try_get( T &v ) {
00876 spin_mutex::scoped_lock l( my_mutex );
00877 if ( my_buffer_is_valid ) {
00878 v = my_buffer;
00879 return true;
00880 } else {
00881 return false;
00882 }
00883 }
00884
00885 bool is_valid() {
00886 spin_mutex::scoped_lock l( my_mutex );
00887 return my_buffer_is_valid;
00888 }
00889
00890 void clear() {
00891 spin_mutex::scoped_lock l( my_mutex );
00892 my_buffer_is_valid = false;
00893 }
00894
00895 protected:
00896 spin_mutex my_mutex;
00897 internal::broadcast_cache< T, null_rw_mutex > my_successors;
00898 T my_buffer;
00899 bool my_buffer_is_valid;
00900 };
00901
00902 template< typename T >
00903 class write_once_node : public overwrite_node<T> {
00904 public:
00905 typedef T input_type;
00906 typedef T output_type;
00907 typedef sender< input_type > predecessor_type;
00908 typedef receiver< output_type > successor_type;
00909
00911 write_once_node(graph& g) : overwrite_node<T>(g) {}
00912
00914 write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00915
00916 bool try_put( const T &v ) {
00917 spin_mutex::scoped_lock l( this->my_mutex );
00918 if ( this->my_buffer_is_valid ) {
00919 return false;
00920 } else {
00921 this->my_buffer = v;
00922 this->my_buffer_is_valid = true;
00923 this->my_successors.try_put(v);
00924 return true;
00925 }
00926 }
00927 };
00928
00930 template <typename T>
00931 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00932 using graph_node::my_graph;
00933 internal::broadcast_cache<T> my_successors;
00934 public:
00935 typedef T input_type;
00936 typedef T output_type;
00937 typedef sender< input_type > predecessor_type;
00938 typedef receiver< output_type > successor_type;
00939
00940 broadcast_node(graph& g) : graph_node(g) {
00941 my_successors.set_owner( this );
00942 }
00943
00944
00945 broadcast_node( const broadcast_node& src ) :
00946 graph_node(src.my_graph), receiver<T>(), sender<T>()
00947 {
00948 my_successors.set_owner( this );
00949 }
00950
00952 virtual bool register_successor( receiver<T> &r ) {
00953 my_successors.register_successor( r );
00954 return true;
00955 }
00956
00958 virtual bool remove_successor( receiver<T> &r ) {
00959 my_successors.remove_successor( r );
00960 return true;
00961 }
00962
00963 bool try_put( const T &t ) {
00964 my_successors.try_put(t);
00965 return true;
00966 }
00967 };
00968
00969 #include "internal/_flow_graph_item_buffer_impl.h"
00970
00972 template <typename T, typename A=cache_aligned_allocator<T> >
00973 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00974 using graph_node::my_graph;
00975 public:
00976 typedef T input_type;
00977 typedef T output_type;
00978 typedef sender< input_type > predecessor_type;
00979 typedef receiver< output_type > successor_type;
00980 typedef buffer_node<T, A> my_class;
00981 protected:
00982 typedef size_t size_type;
00983 internal::round_robin_cache< T, null_rw_mutex > my_successors;
00984
00985 task *my_parent;
00986
00987 friend class internal::forward_task< buffer_node< T, A > >;
00988
00989 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00990 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00991
00992
00993 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00994 public:
00995 char type;
00996 T *elem;
00997 successor_type *r;
00998 buffer_operation(const T& e, op_type t) :
00999 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
01000 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
01001 };
01002
01003 bool forwarder_busy;
01004 typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
01005 friend class internal::aggregating_functor<my_class, buffer_operation>;
01006 internal::aggregator< my_handler, buffer_operation> my_aggregator;
01007
01008 virtual void handle_operations(buffer_operation *op_list) {
01009 buffer_operation *tmp;
01010 bool try_forwarding=false;
01011 while (op_list) {
01012 tmp = op_list;
01013 op_list = op_list->next;
01014 switch (tmp->type) {
01015 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
01016 case rem_succ: internal_rem_succ(tmp); break;
01017 case req_item: internal_pop(tmp); break;
01018 case res_item: internal_reserve(tmp); break;
01019 case rel_res: internal_release(tmp); try_forwarding = true; break;
01020 case con_res: internal_consume(tmp); try_forwarding = true; break;
01021 case put_item: internal_push(tmp); try_forwarding = true; break;
01022 case try_fwd: internal_forward(tmp); break;
01023 }
01024 }
01025 if (try_forwarding && !forwarder_busy) {
01026 forwarder_busy = true;
01027 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
01028 }
01029 }
01030
01032 virtual void forward() {
01033 buffer_operation op_data(try_fwd);
01034 do {
01035 op_data.status = WAIT;
01036 my_aggregator.execute(&op_data);
01037 } while (op_data.status == SUCCEEDED);
01038 }
01039
01041 virtual void internal_reg_succ(buffer_operation *op) {
01042 my_successors.register_successor(*(op->r));
01043 __TBB_store_with_release(op->status, SUCCEEDED);
01044 }
01045
01047 virtual void internal_rem_succ(buffer_operation *op) {
01048 my_successors.remove_successor(*(op->r));
01049 __TBB_store_with_release(op->status, SUCCEEDED);
01050 }
01051
01053 virtual void internal_forward(buffer_operation *op) {
01054 T i_copy;
01055 bool success = false;
01056 size_type counter = my_successors.size();
01057
01058 while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
01059 this->fetch_back(i_copy);
01060 if( my_successors.try_put(i_copy) ) {
01061 this->invalidate_back();
01062 --(this->my_tail);
01063 success = true;
01064 }
01065 --counter;
01066 }
01067 if (success && !counter)
01068 __TBB_store_with_release(op->status, SUCCEEDED);
01069 else {
01070 __TBB_store_with_release(op->status, FAILED);
01071 forwarder_busy = false;
01072 }
01073 }
01074
01075 virtual void internal_push(buffer_operation *op) {
01076 this->push_back(*(op->elem));
01077 __TBB_store_with_release(op->status, SUCCEEDED);
01078 }
01079
01080 virtual void internal_pop(buffer_operation *op) {
01081 if(this->pop_back(*(op->elem))) {
01082 __TBB_store_with_release(op->status, SUCCEEDED);
01083 }
01084 else {
01085 __TBB_store_with_release(op->status, FAILED);
01086 }
01087 }
01088
01089 virtual void internal_reserve(buffer_operation *op) {
01090 if(this->reserve_front(*(op->elem))) {
01091 __TBB_store_with_release(op->status, SUCCEEDED);
01092 }
01093 else {
01094 __TBB_store_with_release(op->status, FAILED);
01095 }
01096 }
01097
01098 virtual void internal_consume(buffer_operation *op) {
01099 this->consume_front();
01100 __TBB_store_with_release(op->status, SUCCEEDED);
01101 }
01102
01103 virtual void internal_release(buffer_operation *op) {
01104 this->release_front();
01105 __TBB_store_with_release(op->status, SUCCEEDED);
01106 }
01107
01108 public:
01110 buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
01111 my_parent( g.root_task() ), forwarder_busy(false) {
01112 my_successors.set_owner(this);
01113 my_aggregator.initialize_handler(my_handler(this));
01114 }
01115
01117 buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
01118 reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
01119 my_parent( src.my_parent ) {
01120 forwarder_busy = false;
01121 my_successors.set_owner(this);
01122 my_aggregator.initialize_handler(my_handler(this));
01123 }
01124
01125 virtual ~buffer_node() {}
01126
01127
01128
01129
01130
01132
01133 bool register_successor( receiver<output_type> &r ) {
01134 buffer_operation op_data(reg_succ);
01135 op_data.r = &r;
01136 my_aggregator.execute(&op_data);
01137 return true;
01138 }
01139
01141
01143 bool remove_successor( receiver<output_type> &r ) {
01144 r.remove_predecessor(*this);
01145 buffer_operation op_data(rem_succ);
01146 op_data.r = &r;
01147 my_aggregator.execute(&op_data);
01148 return true;
01149 }
01150
01152
01154 bool try_get( T &v ) {
01155 buffer_operation op_data(req_item);
01156 op_data.elem = &v;
01157 my_aggregator.execute(&op_data);
01158 return (op_data.status==SUCCEEDED);
01159 }
01160
01162
01164 bool try_reserve( T &v ) {
01165 buffer_operation op_data(res_item);
01166 op_data.elem = &v;
01167 my_aggregator.execute(&op_data);
01168 return (op_data.status==SUCCEEDED);
01169 }
01170
01172
01173 bool try_release() {
01174 buffer_operation op_data(rel_res);
01175 my_aggregator.execute(&op_data);
01176 return true;
01177 }
01178
01180
01181 bool try_consume() {
01182 buffer_operation op_data(con_res);
01183 my_aggregator.execute(&op_data);
01184 return true;
01185 }
01186
01188
01189 bool try_put(const T &t) {
01190 buffer_operation op_data(t, put_item);
01191 my_aggregator.execute(&op_data);
01192 return true;
01193 }
01194 };
01195
01197 template <typename T, typename A=cache_aligned_allocator<T> >
01198 class queue_node : public buffer_node<T, A> {
01199 protected:
01200 typedef typename buffer_node<T, A>::size_type size_type;
01201 typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01202
01203 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01204
01206 void internal_forward(queue_operation *op) {
01207 T i_copy;
01208 bool success = false;
01209 size_type counter = this->my_successors.size();
01210 if (this->my_reserved || !this->item_valid(this->my_head)) {
01211 __TBB_store_with_release(op->status, FAILED);
01212 this->forwarder_busy = false;
01213 return;
01214 }
01215
01216 while (counter>0 && this->item_valid(this->my_head)) {
01217 this->fetch_front(i_copy);
01218 if(this->my_successors.try_put(i_copy)) {
01219 this->invalidate_front();
01220 ++(this->my_head);
01221 success = true;
01222 }
01223 --counter;
01224 }
01225 if (success && !counter)
01226 __TBB_store_with_release(op->status, SUCCEEDED);
01227 else {
01228 __TBB_store_with_release(op->status, FAILED);
01229 this->forwarder_busy = false;
01230 }
01231 }
01232
01233 void internal_pop(queue_operation *op) {
01234 if ( this->my_reserved || !this->item_valid(this->my_head)){
01235 __TBB_store_with_release(op->status, FAILED);
01236 }
01237 else {
01238 this->pop_front(*(op->elem));
01239 __TBB_store_with_release(op->status, SUCCEEDED);
01240 }
01241 }
01242 void internal_reserve(queue_operation *op) {
01243 if (this->my_reserved || !this->item_valid(this->my_head)) {
01244 __TBB_store_with_release(op->status, FAILED);
01245 }
01246 else {
01247 this->my_reserved = true;
01248 this->fetch_front(*(op->elem));
01249 this->invalidate_front();
01250 __TBB_store_with_release(op->status, SUCCEEDED);
01251 }
01252 }
01253 void internal_consume(queue_operation *op) {
01254 this->consume_front();
01255 __TBB_store_with_release(op->status, SUCCEEDED);
01256 }
01257
01258 public:
01259 typedef T input_type;
01260 typedef T output_type;
01261 typedef sender< input_type > predecessor_type;
01262 typedef receiver< output_type > successor_type;
01263
01265 queue_node( graph &g ) : buffer_node<T, A>(g) {}
01266
01268 queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01269 };
01270
01272 template< typename T, typename A=cache_aligned_allocator<T> >
01273 class sequencer_node : public queue_node<T, A> {
01274 internal::function_body< T, size_t > *my_sequencer;
01275 public:
01276 typedef T input_type;
01277 typedef T output_type;
01278 typedef sender< input_type > predecessor_type;
01279 typedef receiver< output_type > successor_type;
01280
01282 template< typename Sequencer >
01283 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01284 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01285
01287 sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01288 my_sequencer( src.my_sequencer->clone() ) {}
01289
01291 ~sequencer_node() { delete my_sequencer; }
01292 protected:
01293 typedef typename buffer_node<T, A>::size_type size_type;
01294 typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01295
01296 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01297
01298 private:
01299 void internal_push(sequencer_operation *op) {
01300 size_type tag = (*my_sequencer)(*(op->elem));
01301
01302 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01303
01304 if(this->size() > this->capacity())
01305 this->grow_my_array(this->size());
01306 this->item(tag) = std::make_pair( *(op->elem), true );
01307 __TBB_store_with_release(op->status, SUCCEEDED);
01308 }
01309 };
01310
01312 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01313 class priority_queue_node : public buffer_node<T, A> {
01314 public:
01315 typedef T input_type;
01316 typedef T output_type;
01317 typedef sender< input_type > predecessor_type;
01318 typedef receiver< output_type > successor_type;
01319
01321 priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01322
01324 priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01325
01326 protected:
01327 typedef typename buffer_node<T, A>::size_type size_type;
01328 typedef typename buffer_node<T, A>::item_type item_type;
01329 typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01330
01331 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01332
01333 void handle_operations(prio_operation *op_list) {
01334 prio_operation *tmp ;
01335 bool try_forwarding=false;
01336 while (op_list) {
01337 tmp = op_list;
01338 op_list = op_list->next;
01339 switch (tmp->type) {
01340 case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01341 case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01342 case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01343 case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01344 case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01345 case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01346 case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01347 case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01348 }
01349 }
01350
01351 if (mark<this->my_tail) heapify();
01352 if (try_forwarding && !this->forwarder_busy) {
01353 this->forwarder_busy = true;
01354 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01355 }
01356 }
01357
01359 void internal_forward(prio_operation *op) {
01360 T i_copy;
01361 bool success = false;
01362 size_type counter = this->my_successors.size();
01363
01364 if (this->my_reserved || this->my_tail == 0) {
01365 __TBB_store_with_release(op->status, FAILED);
01366 this->forwarder_busy = false;
01367 return;
01368 }
01369
01370 while (counter>0 && this->my_tail > 0) {
01371 i_copy = this->my_array[0].first;
01372 bool msg = this->my_successors.try_put(i_copy);
01373 if ( msg == true ) {
01374 if (mark == this->my_tail) --mark;
01375 --(this->my_tail);
01376 this->my_array[0].first=this->my_array[this->my_tail].first;
01377 if (this->my_tail > 1)
01378 reheap();
01379 success = true;
01380 }
01381 --counter;
01382 }
01383 if (success && !counter)
01384 __TBB_store_with_release(op->status, SUCCEEDED);
01385 else {
01386 __TBB_store_with_release(op->status, FAILED);
01387 this->forwarder_busy = false;
01388 }
01389 }
01390
01391 void internal_push(prio_operation *op) {
01392 if ( this->my_tail >= this->my_array_size )
01393 this->grow_my_array( this->my_tail + 1 );
01394 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01395 ++(this->my_tail);
01396 __TBB_store_with_release(op->status, SUCCEEDED);
01397 }
01398 void internal_pop(prio_operation *op) {
01399 if ( this->my_reserved == true || this->my_tail == 0 ) {
01400 __TBB_store_with_release(op->status, FAILED);
01401 }
01402 else {
01403 if (mark<this->my_tail &&
01404 compare(this->my_array[0].first,
01405 this->my_array[this->my_tail-1].first)) {
01406
01407
01408 *(op->elem) = this->my_array[this->my_tail-1].first;
01409 --(this->my_tail);
01410 __TBB_store_with_release(op->status, SUCCEEDED);
01411 }
01412 else {
01413 *(op->elem) = this->my_array[0].first;
01414 if (mark == this->my_tail) --mark;
01415 --(this->my_tail);
01416 __TBB_store_with_release(op->status, SUCCEEDED);
01417 this->my_array[0].first=this->my_array[this->my_tail].first;
01418 if (this->my_tail > 1)
01419 reheap();
01420 }
01421 }
01422 }
01423 void internal_reserve(prio_operation *op) {
01424 if (this->my_reserved == true || this->my_tail == 0) {
01425 __TBB_store_with_release(op->status, FAILED);
01426 }
01427 else {
01428 this->my_reserved = true;
01429 *(op->elem) = reserved_item = this->my_array[0].first;
01430 if (mark == this->my_tail) --mark;
01431 --(this->my_tail);
01432 __TBB_store_with_release(op->status, SUCCEEDED);
01433 this->my_array[0].first = this->my_array[this->my_tail].first;
01434 if (this->my_tail > 1)
01435 reheap();
01436 }
01437 }
01438 void internal_consume(prio_operation *op) {
01439 this->my_reserved = false;
01440 __TBB_store_with_release(op->status, SUCCEEDED);
01441 }
01442 void internal_release(prio_operation *op) {
01443 if (this->my_tail >= this->my_array_size)
01444 this->grow_my_array( this->my_tail + 1 );
01445 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01446 ++(this->my_tail);
01447 this->my_reserved = false;
01448 __TBB_store_with_release(op->status, SUCCEEDED);
01449 heapify();
01450 }
01451 private:
01452 Compare compare;
01453 size_type mark;
01454 input_type reserved_item;
01455
01456 void heapify() {
01457 if (!mark) mark = 1;
01458 for (; mark<this->my_tail; ++mark) {
01459 size_type cur_pos = mark;
01460 input_type to_place = this->my_array[mark].first;
01461 do {
01462 size_type parent = (cur_pos-1)>>1;
01463 if (!compare(this->my_array[parent].first, to_place))
01464 break;
01465 this->my_array[cur_pos].first = this->my_array[parent].first;
01466 cur_pos = parent;
01467 } while( cur_pos );
01468 this->my_array[cur_pos].first = to_place;
01469 }
01470 }
01471
01472 void reheap() {
01473 size_type cur_pos=0, child=1;
01474 while (child < mark) {
01475 size_type target = child;
01476 if (child+1<mark &&
01477 compare(this->my_array[child].first,
01478 this->my_array[child+1].first))
01479 ++target;
01480
01481 if (compare(this->my_array[target].first,
01482 this->my_array[this->my_tail].first))
01483 break;
01484 this->my_array[cur_pos].first = this->my_array[target].first;
01485 cur_pos = target;
01486 child = (cur_pos<<1)+1;
01487 }
01488 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01489 }
01490 };
01491
01493
01496 template< typename T >
01497 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01498 using graph_node::my_graph;
01499 public:
01500 typedef T input_type;
01501 typedef T output_type;
01502 typedef sender< input_type > predecessor_type;
01503 typedef receiver< output_type > successor_type;
01504
01505 private:
01506 task *my_root_task;
01507 size_t my_threshold;
01508 size_t my_count;
01509 internal::predecessor_cache< T > my_predecessors;
01510 spin_mutex my_mutex;
01511 internal::broadcast_cache< T > my_successors;
01512 int init_decrement_predecessors;
01513
01514 friend class internal::forward_task< limiter_node<T> >;
01515
01516
01517 friend class internal::decrementer< limiter_node<T> >;
01518
01519 void decrement_counter() {
01520 input_type v;
01521
01522
01523 if ( my_predecessors.get_item( v ) == false
01524 || my_successors.try_put(v) == false ) {
01525 spin_mutex::scoped_lock lock(my_mutex);
01526 --my_count;
01527 if ( !my_predecessors.empty() )
01528 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01529 internal::forward_task< limiter_node<T> >( *this ) );
01530 }
01531 }
01532
01533 void forward() {
01534 {
01535 spin_mutex::scoped_lock lock(my_mutex);
01536 if ( my_count < my_threshold )
01537 ++my_count;
01538 else
01539 return;
01540 }
01541 decrement_counter();
01542 }
01543
01544 public:
01546 internal::decrementer< limiter_node<T> > decrement;
01547
01549 limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01550 graph_node(g), my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01551 init_decrement_predecessors(num_decrement_predecessors),
01552 decrement(num_decrement_predecessors)
01553 {
01554 my_predecessors.set_owner(this);
01555 my_successors.set_owner(this);
01556 decrement.set_owner(this);
01557 }
01558
01560 limiter_node( const limiter_node& src ) :
01561 graph_node(src.my_graph), receiver<T>(), sender<T>(),
01562 my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01563 init_decrement_predecessors(src.init_decrement_predecessors),
01564 decrement(src.init_decrement_predecessors)
01565 {
01566 my_predecessors.set_owner(this);
01567 my_successors.set_owner(this);
01568 decrement.set_owner(this);
01569 }
01570
01572 bool register_successor( receiver<output_type> &r ) {
01573 my_successors.register_successor(r);
01574 return true;
01575 }
01576
01578
01579 bool remove_successor( receiver<output_type> &r ) {
01580 r.remove_predecessor(*this);
01581 my_successors.remove_successor(r);
01582 return true;
01583 }
01584
01586 bool try_put( const T &t ) {
01587 {
01588 spin_mutex::scoped_lock lock(my_mutex);
01589 if ( my_count >= my_threshold )
01590 return false;
01591 else
01592 ++my_count;
01593 }
01594
01595 bool msg = my_successors.try_put(t);
01596
01597 if ( msg != true ) {
01598 spin_mutex::scoped_lock lock(my_mutex);
01599 --my_count;
01600 if ( !my_predecessors.empty() )
01601 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01602 internal::forward_task< limiter_node<T> >( *this ) );
01603 }
01604
01605 return msg;
01606 }
01607
01609 bool register_predecessor( predecessor_type &src ) {
01610 spin_mutex::scoped_lock lock(my_mutex);
01611 my_predecessors.add( src );
01612 if ( my_count < my_threshold && !my_successors.empty() )
01613 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01614 internal::forward_task< limiter_node<T> >( *this ) );
01615 return true;
01616 }
01617
01619 bool remove_predecessor( predecessor_type &src ) {
01620 my_predecessors.remove( src );
01621 return true;
01622 }
01623 };
01624
01625 #include "internal/_flow_graph_join_impl.h"
01626
01627 using internal::reserving_port;
01628 using internal::queueing_port;
01629 using internal::tag_matching_port;
01630 using internal::input_port;
01631 using internal::tag_value;
01632 using internal::NO_TAG;
01633
01634 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01635
01636 template<typename OutputTuple>
01637 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01638 private:
01639 static const int N = std::tuple_size<OutputTuple>::value;
01640 typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01641 public:
01642 typedef OutputTuple output_type;
01643 typedef typename unfolded_type::input_ports_type input_ports_type;
01644 join_node(graph &g) : unfolded_type(g) { }
01645 join_node(const join_node &other) : unfolded_type(other) {}
01646 };
01647
01648 template<typename OutputTuple>
01649 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01650 private:
01651 static const int N = std::tuple_size<OutputTuple>::value;
01652 typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01653 public:
01654 typedef OutputTuple output_type;
01655 typedef typename unfolded_type::input_ports_type input_ports_type;
01656 join_node(graph &g) : unfolded_type(g) { }
01657 join_node(const join_node &other) : unfolded_type(other) {}
01658 };
01659
01660
01661 template<typename OutputTuple>
01662 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01663 tag_matching_port, OutputTuple, tag_matching> {
01664 private:
01665 static const int N = std::tuple_size<OutputTuple>::value;
01666 typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01667 public:
01668 typedef OutputTuple output_type;
01669 typedef typename unfolded_type::input_ports_type input_ports_type;
01670 template<typename B0, typename B1>
01671 join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01672 template<typename B0, typename B1, typename B2>
01673 join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01674 template<typename B0, typename B1, typename B2, typename B3>
01675 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01676 template<typename B0, typename B1, typename B2, typename B3, typename B4>
01677 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01678 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01679 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01680 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01681 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01682 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01683 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01684 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01685 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01686 template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01687 join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01688 join_node(const join_node &other) : unfolded_type(other) {}
01689 };
01690
01691 #if TBB_PREVIEW_GRAPH_NODES
01692
01693 #include "internal/_flow_graph_or_impl.h"
01694
01695 template<typename InputTuple>
01696 class or_node : public internal::unfolded_or_node<InputTuple> {
01697 private:
01698 static const int N = std::tuple_size<InputTuple>::value;
01699 public:
01700 typedef typename internal::or_output_type<InputTuple>::type output_type;
01701 typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01702 or_node(graph& g) : unfolded_type(g) { }
01703
01704 or_node( const or_node& other ) : unfolded_type(other) { }
01705 };
01706 #endif // TBB_PREVIEW_GRAPH_NODES
01707
01709 template< typename T >
01710 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01711 p.register_successor( s );
01712 }
01713
01715 template< typename T >
01716 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01717 p.remove_successor( s );
01718 }
01719
01721 template< typename Body, typename Node >
01722 Body copy_body( Node &n ) {
01723 return n.template copy_function_object<Body>();
01724 }
01725
01726 }
01727
01728 using interface6::graph;
01729 using interface6::graph_node;
01730 using interface6::continue_msg;
01731 using interface6::sender;
01732 using interface6::receiver;
01733 using interface6::continue_receiver;
01734
01735 using interface6::source_node;
01736 using interface6::function_node;
01737 using interface6::multifunction_node;
01738 using interface6::split_node;
01739 using interface6::internal::output_port;
01740 #if TBB_PREVIEW_GRAPH_NODES
01741 using interface6::or_node;
01742 #endif
01743 using interface6::continue_node;
01744 using interface6::overwrite_node;
01745 using interface6::write_once_node;
01746 using interface6::broadcast_node;
01747 using interface6::buffer_node;
01748 using interface6::queue_node;
01749 using interface6::sequencer_node;
01750 using interface6::priority_queue_node;
01751 using interface6::limiter_node;
01752 using namespace interface6::internal::graph_policy_namespace;
01753 using interface6::join_node;
01754 using interface6::input_port;
01755 using interface6::copy_body;
01756 using interface6::make_edge;
01757 using interface6::remove_edge;
01758 using interface6::internal::NO_TAG;
01759 using interface6::internal::tag_value;
01760
01761 }
01762 }
01763
01764 #endif // __TBB_flow_graph_H