flow_graph.h

Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2012 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_flow_graph_H
00022 #define __TBB_flow_graph_H
00023 
00024 #include "tbb_stddef.h"
00025 #include "atomic.h"
00026 #include "spin_mutex.h"
00027 #include "null_mutex.h"
00028 #include "spin_rw_mutex.h"
00029 #include "null_rw_mutex.h"
00030 #include "task.h"
00031 #include "concurrent_vector.h"
00032 #include "internal/_aggregator_impl.h"
00033 
00034 // use the VC10 or gcc version of tuple if it is available.
00035 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00036 #define TBB_PREVIEW_TUPLE 1
00037 #include "compat/tuple"
00038 #else
00039 #include <tuple>
00040 #endif
00041 
00042 #include<list>
00043 #include<queue>
00044 
00055 namespace tbb {
00056 namespace flow {
00057 
00059 enum concurrency { unlimited = 0, serial = 1 };
00060 
00061 namespace interface6 {
00062 
00064 class continue_msg {};
00065 
00066 template< typename T > class sender;
00067 template< typename T > class receiver;
00068 class continue_receiver;
00069 
00071 template< typename T >
00072 class sender {
00073 public:
00075     typedef T output_type;
00076 
00078     typedef receiver<T> successor_type;
00079 
00080     virtual ~sender() {}
00081 
00083     virtual bool register_successor( successor_type &r ) = 0;
00084 
00086     virtual bool remove_successor( successor_type &r ) = 0;
00087 
00089     virtual bool try_get( T & ) { return false; }
00090 
00092     virtual bool try_reserve( T & ) { return false; }
00093 
00095     virtual bool try_release( ) { return false; }
00096 
00098     virtual bool try_consume( ) { return false; }
00099 };
00100 
00102 template< typename T >
00103 class receiver {
00104 public:
00106     typedef T input_type;
00107 
00109     typedef sender<T> predecessor_type;
00110 
00112     virtual ~receiver() {}
00113 
00115     virtual bool try_put( const T& t ) = 0;
00116 
00118     virtual bool register_predecessor( predecessor_type & ) { return false; }
00119 
00121     virtual bool remove_predecessor( predecessor_type & ) { return false; }
00122 };
00123 
00125 
00126 class continue_receiver : public receiver< continue_msg > {
00127 public:
00128 
00130     typedef continue_msg input_type;
00131 
00133     typedef sender< continue_msg > predecessor_type;
00134 
00136     continue_receiver( int number_of_predecessors = 0 ) {
00137         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
00138         my_current_count = 0;
00139     }
00140 
00142     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
00143         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
00144         my_current_count = 0;
00145     }
00146 
00148     virtual ~continue_receiver() { }
00149 
00151     /* override */ bool register_predecessor( predecessor_type & ) {
00152         spin_mutex::scoped_lock l(my_mutex);
00153         ++my_predecessor_count;
00154         return true;
00155     }
00156 
00158 
00161     /* override */ bool remove_predecessor( predecessor_type & ) {
00162         spin_mutex::scoped_lock l(my_mutex);
00163         --my_predecessor_count;
00164         return true;
00165     }
00166 
00168 
00170     /* override */ bool try_put( const input_type & ) {
00171         {
00172             spin_mutex::scoped_lock l(my_mutex);
00173             if ( ++my_current_count < my_predecessor_count )
00174                 return true;
00175             else
00176                 my_current_count = 0;
00177         }
00178         execute();
00179         return true;
00180     }
00181 
00182 protected:
00183     spin_mutex my_mutex;
00184     int my_predecessor_count;
00185     int my_current_count;
00186     int my_initial_predecessor_count;
00187 
00189 
00191     virtual void execute() = 0;
00192 };
00193 
00194 #include "internal/_flow_graph_impl.h"
00195 using namespace internal::graph_policy_namespace;
00196 
00197 class graph;
00198 class graph_node;
00199 
00200 template <typename GraphContainerType, typename GraphNodeType>
00201 class graph_iterator {
00202     friend class graph;
00203     friend class graph_node;
00204 public:
00205     typedef size_t size_type;
00206     typedef GraphNodeType value_type;
00207     typedef GraphNodeType* pointer;
00208     typedef GraphNodeType& reference;
00209     typedef const GraphNodeType& const_reference;
00210     typedef std::forward_iterator_tag iterator_category;
00211 
00213     graph_iterator() : my_graph(NULL), current_node(NULL) {}
00214 
00216     graph_iterator(const graph_iterator& other) :
00217         my_graph(other.my_graph), current_node(other.current_node)
00218     {}
00219 
00221     graph_iterator& operator=(const graph_iterator& other) {
00222         if (this != &other) {
00223             my_graph = other.my_graph;
00224             current_node = other.current_node;
00225         }
00226         return *this;
00227     }
00228 
00230     reference operator*() const;
00231 
00233     pointer operator->() const;
00234 
00236     bool operator==(const graph_iterator& other) const {
00237         return ((my_graph == other.my_graph) && (current_node == other.current_node));
00238     }
00239 
00241     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
00242 
00244     graph_iterator& operator++() {
00245         internal_forward();
00246         return *this;
00247     }
00248 
00250     graph_iterator operator++(int) {
00251         graph_iterator result = *this;
00252         operator++();
00253         return result;
00254     }
00255 
00256 private:
00257     // the graph over which we are iterating
00258     GraphContainerType *my_graph;
00259     // pointer into my_graph's my_nodes list
00260     pointer current_node;
00261 
00263     graph_iterator(GraphContainerType *g, bool begin);
00264     void internal_forward();
00265 };
00266 
00268 
00269 class graph : tbb::internal::no_copy {
00270     friend class graph_node;
00271 
00272     template< typename Body >
00273     class run_task : public task {
00274     public:
00275         run_task( Body& body ) : my_body(body) {}
00276         task *execute() {
00277             my_body();
00278             return NULL;
00279         }
00280     private:
00281         Body my_body;
00282     };
00283 
00284     template< typename Receiver, typename Body >
00285     class run_and_put_task : public task {
00286     public:
00287         run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00288         task *execute() {
00289             my_receiver.try_put( my_body() );
00290             return NULL;
00291         }
00292     private:
00293         Receiver &my_receiver;
00294         Body my_body;
00295     };
00296 
00297 public:
00299     explicit graph() : my_nodes(NULL), my_nodes_last(NULL)
00300     {
00301         own_context = true;
00302         cancelled = false;
00303         caught_exception = false;
00304         my_context = new task_group_context();
00305         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00306         my_root_task->set_ref_count(1);
00307     }
00308 
00310     explicit graph(task_group_context& use_this_context) :
00311     my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL)
00312     {
00313         own_context = false;
00314         my_root_task = ( new ( task::allocate_root(*my_context) ) empty_task );
00315         my_root_task->set_ref_count(1);
00316     }
00317 
00319 
00320     ~graph() {
00321         wait_for_all();
00322         my_root_task->set_ref_count(0);
00323         task::destroy( *my_root_task );
00324         if (own_context) delete my_context;
00325     }
00326 
00328 
00330     void increment_wait_count() {
00331         if (my_root_task)
00332             my_root_task->increment_ref_count();
00333     }
00334 
00336 
00338     void decrement_wait_count() {
00339         if (my_root_task)
00340             my_root_task->decrement_ref_count();
00341     }
00342 
00344 
00346     template< typename Receiver, typename Body >
00347         void run( Receiver &r, Body body ) {
00348        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00349            run_and_put_task< Receiver, Body >( r, body ) );
00350     }
00351 
00353 
00355     template< typename Body >
00356     void run( Body body ) {
00357        task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00358            run_task< Body >( body ) );
00359     }
00360 
00362 
00363     void wait_for_all() {
00364         cancelled = false;
00365         caught_exception = false;
00366         if (my_root_task) {
00367 #if TBB_USE_EXCEPTIONS
00368             try {
00369 #endif
00370                 my_root_task->wait_for_all();
00371                 cancelled = my_context->is_group_execution_cancelled();
00372 #if TBB_USE_EXCEPTIONS
00373             }
00374             catch(...) {
00375                 my_root_task->set_ref_count(1);
00376                 my_context->reset();
00377                 caught_exception = true;
00378                 cancelled = true;
00379                 throw;
00380             }
00381 #endif
00382             my_root_task->set_ref_count(1);
00383         }
00384     }
00385 
00387     task * root_task() {
00388         return my_root_task;
00389     }
00390 
00391     // ITERATORS
00392     template<typename C, typename N>
00393     friend class graph_iterator;
00394 
00395     // Graph iterator typedefs
00396     typedef graph_iterator<graph,graph_node> iterator;
00397     typedef graph_iterator<const graph,const graph_node> const_iterator;
00398 
00399     // Graph iterator constructors
00401     iterator begin() { return iterator(this, true); }
00403     iterator end() { return iterator(this, false); }
00405     const_iterator begin() const { return const_iterator(this, true); }
00407     const_iterator end() const { return const_iterator(this, false); }
00409     const_iterator cbegin() const { return const_iterator(this, true); }
00411     const_iterator cend() const { return const_iterator(this, false); }
00412 
00414     bool is_cancelled() { return cancelled; }
00415     bool exception_thrown() { return caught_exception; }
00416 
00417 private:
00418     task *my_root_task;
00419     task_group_context *my_context;
00420     bool own_context;
00421     bool cancelled;
00422     bool caught_exception;
00423 
00424     graph_node *my_nodes, *my_nodes_last;
00425 
00426     spin_mutex nodelist_mutex;
00427     void register_node(graph_node *n); 
00428     void remove_node(graph_node *n);
00429 
00430 };
00431 
00432 template <typename C, typename N>
00433 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
00434 {
00435     if (begin) current_node = my_graph->my_nodes;
00436     //else it is an end iterator by default
00437 }
00438 
00439 template <typename C, typename N>
00440 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
00441     __TBB_ASSERT(current_node, "graph_iterator at end");
00442     return *operator->();
00443 }
00444 
00445 template <typename C, typename N>
00446 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const { 
00447     return current_node;
00448 }
00449 
00450 
00451 template <typename C, typename N>
00452 void graph_iterator<C,N>::internal_forward() {
00453     if (current_node) current_node = current_node->next;
00454 }
00455 
00457 class graph_node : tbb::internal::no_assign {
00458     friend class graph;
00459     template<typename C, typename N>
00460     friend class graph_iterator;
00461 protected:
00462     graph& my_graph;
00463     graph_node *next, *prev;
00464 public:
00465     graph_node(graph& g) : my_graph(g) {
00466         my_graph.register_node(this);
00467     }
00468     virtual ~graph_node() {
00469         my_graph.remove_node(this);
00470     }
00471 };
00472 
00473 void graph::register_node(graph_node *n) {
00474     n->next = NULL;
00475     {
00476         spin_mutex::scoped_lock lock(nodelist_mutex);
00477         n->prev = my_nodes_last;
00478         if (my_nodes_last) my_nodes_last->next = n;
00479         my_nodes_last = n;
00480         if (!my_nodes) my_nodes = n;
00481     }
00482 }
00483 
00484 void graph::remove_node(graph_node *n) {
00485     {
00486         spin_mutex::scoped_lock lock(nodelist_mutex);
00487         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
00488         if (n->prev) n->prev->next = n->next;
00489         if (n->next) n->next->prev = n->prev;
00490         if (my_nodes_last == n) my_nodes_last = n->prev;
00491         if (my_nodes == n) my_nodes = n->next;
00492     }
00493     n->prev = n->next = NULL;
00494 }
00495 
00496 #include "internal/_flow_graph_node_impl.h"
00497 
00499 template < typename Output >
00500 class source_node : public graph_node, public sender< Output > {
00501     using graph_node::my_graph;
00502 public:
00504     typedef Output output_type;
00505 
00507     typedef receiver< Output > successor_type;
00508 
00510     template< typename Body >
00511     source_node( graph &g, Body body, bool is_active = true )
00512         : graph_node(g), my_root_task(g.root_task()), my_active(is_active), init_my_active(is_active),
00513         my_body( new internal::source_body_leaf< output_type, Body>(body) ),
00514         my_reserved(false), my_has_cached_item(false)
00515     {
00516         my_successors.set_owner(this);
00517     }
00518 
00520     source_node( const source_node& src ) :
00521         graph_node(src.my_graph), sender<Output>(),
00522         my_root_task( src.my_root_task), my_active(src.init_my_active),
00523         init_my_active(src.init_my_active), my_body( src.my_body->clone() ),
00524         my_reserved(false), my_has_cached_item(false)
00525     {
00526         my_successors.set_owner(this);
00527     }
00528 
00530     ~source_node() { delete my_body; }
00531 
00533     /* override */ bool register_successor( receiver<output_type> &r ) {
00534         spin_mutex::scoped_lock lock(my_mutex);
00535         my_successors.register_successor(r);
00536         if ( my_active )
00537             spawn_put();
00538         return true;
00539     }
00540 
00542     /* override */ bool remove_successor( receiver<output_type> &r ) {
00543         spin_mutex::scoped_lock lock(my_mutex);
00544         my_successors.remove_successor(r);
00545         return true;
00546     }
00547 
00549     /*override */ bool try_get( output_type &v ) {
00550         spin_mutex::scoped_lock lock(my_mutex);
00551         if ( my_reserved )
00552             return false;
00553 
00554         if ( my_has_cached_item ) {
00555             v = my_cached_item;
00556             my_has_cached_item = false;
00557         } else if ( (*my_body)(v) == false ) {
00558             return false;
00559         }
00560         return true;
00561     }
00562 
00564     /* override */ bool try_reserve( output_type &v ) {
00565         spin_mutex::scoped_lock lock(my_mutex);
00566         if ( my_reserved ) {
00567             return false;
00568         }
00569 
00570         if ( !my_has_cached_item && (*my_body)(my_cached_item) )
00571             my_has_cached_item = true;
00572 
00573         if ( my_has_cached_item ) {
00574             v = my_cached_item;
00575             my_reserved = true;
00576             return true;
00577         } else {
00578             return false;
00579         }
00580     }
00581 
00583 
00584     /* override */ bool try_release( ) {
00585         spin_mutex::scoped_lock lock(my_mutex);
00586         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
00587         my_reserved = false;
00588         if(!my_successors.empty())
00589             spawn_put();
00590         return true;
00591     }
00592 
00594     /* override */ bool try_consume( ) {
00595         spin_mutex::scoped_lock lock(my_mutex);
00596         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
00597         my_reserved = false;
00598         my_has_cached_item = false;
00599         if ( !my_successors.empty() ) {
00600             spawn_put();
00601         }
00602         return true;
00603     }
00604 
00606     void activate() {
00607         spin_mutex::scoped_lock lock(my_mutex);
00608         my_active = true;
00609         if ( !my_successors.empty() )
00610             spawn_put();
00611     }
00612 
00613 private:
00614     task *my_root_task;
00615     spin_mutex my_mutex;
00616     bool my_active;
00617     bool init_my_active;
00618     internal::source_body<output_type> *my_body;
00619     internal::broadcast_cache< output_type > my_successors;
00620     bool my_reserved;
00621     bool my_has_cached_item;
00622     output_type my_cached_item;
00623 
00624     friend class internal::source_task< source_node< output_type > >;
00625 
00627     /* override */ void apply_body( ) {
00628         output_type v;
00629         if ( try_reserve(v) == false )
00630             return;
00631 
00632         if ( my_successors.try_put( v ) )
00633             try_consume();
00634         else
00635             try_release();
00636     }
00637 
00639     /* override */ void spawn_put( ) {
00640         task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00641            internal::source_task< source_node< output_type > >( *this ) );
00642     }
00643 };
00644 
00646 template < typename Input, typename Output = continue_msg, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00647 class function_node : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00648     using graph_node::my_graph;
00649 public:
00650     typedef Input input_type;
00651     typedef Output output_type;
00652     typedef sender< input_type > predecessor_type;
00653     typedef receiver< output_type > successor_type;
00654     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00655     typedef internal::function_output<output_type> fOutput_type;
00656 
00658     template< typename Body >
00659     function_node( graph &g, size_t concurrency, Body body ) :
00660         graph_node(g), internal::function_input<input_type,output_type,Allocator>(g, concurrency, body)
00661     {}
00662 
00664     function_node( const function_node& src ) :
00665         graph_node(src.my_graph), internal::function_input<input_type,output_type,Allocator>( src ),
00666         fOutput_type()
00667     {}
00668 
00669     bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00670 
00671 protected:
00672     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00673 };
00674 
00676 template < typename Input, typename Output, typename Allocator >
00677 class function_node<Input,Output,queueing,Allocator> : public graph_node, public internal::function_input<Input,Output,Allocator>, public internal::function_output<Output> {
00678     using graph_node::my_graph;
00679 public:
00680     typedef Input input_type;
00681     typedef Output output_type;
00682     typedef sender< input_type > predecessor_type;
00683     typedef receiver< output_type > successor_type;
00684     typedef internal::function_input<input_type,output_type,Allocator> fInput_type;
00685     typedef internal::function_input_queue<input_type, Allocator> queue_type;
00686     typedef internal::function_output<output_type> fOutput_type;
00687 
00689     template< typename Body >
00690     function_node( graph &g, size_t concurrency, Body body ) :
00691         graph_node(g), fInput_type( g, concurrency, body, new queue_type() )
00692     {}
00693 
00695     function_node( const function_node& src ) :
00696         graph_node(src.my_graph), fInput_type( src, new queue_type() ), fOutput_type()
00697     {}
00698 
00699     bool try_put(const input_type &i) { return fInput_type::try_put(i); }
00700 
00701 protected:
00702     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00703 };
00704 
00705 #include "tbb/internal/_flow_graph_types_impl.h"
00706 
00708 // Output is a tuple of output types.
00709 template < typename Input, typename Output, graph_buffer_policy = queueing, typename Allocator=cache_aligned_allocator<Input> >
00710 class multifunction_node :
00711     public graph_node,
00712     public internal::multifunction_input
00713     <
00714         Input,
00715         typename internal::wrap_tuple_elements<
00716             std::tuple_size<Output>::value,  // #elements in tuple
00717             internal::function_output,  // wrap this around each element
00718             Output // the tuple providing the types
00719         >::type,
00720         Allocator
00721     > {
00722     using graph_node::my_graph;
00723 private:
00724     static const int N = std::tuple_size<Output>::value;
00725 public:
00726     typedef Input input_type;
00727     typedef typename internal::wrap_tuple_elements<N,internal::function_output, Output>::type output_ports_type;
00728 private:
00729     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00730     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00731 public:
00732     template<typename Body>
00733     multifunction_node( graph &g, size_t concurrency, Body body ) :
00734         graph_node(g), base_type(g,concurrency, body)
00735     {}
00736     multifunction_node( const multifunction_node &other) :
00737         graph_node(other.my_graph), base_type(other)
00738     {}
00739     // all the guts are in multifunction_input...
00740 };  // multifunction_node
00741 
00742 template < typename Input, typename Output, typename Allocator >
00743 class multifunction_node<Input,Output,queueing,Allocator> : public graph_node, public internal::multifunction_input<Input,
00744     typename internal::wrap_tuple_elements<std::tuple_size<Output>::value, internal::function_output, Output>::type, Allocator> {
00745     using graph_node::my_graph;
00746     static const int N = std::tuple_size<Output>::value;
00747 public:
00748     typedef Input input_type;
00749     typedef typename internal::wrap_tuple_elements<N, internal::function_output, Output>::type output_ports_type;
00750 private:
00751     typedef typename internal::multifunction_input<input_type, output_ports_type, Allocator> base_type;
00752     typedef typename internal::function_input_queue<input_type,Allocator> queue_type;
00753 public:
00754     template<typename Body>
00755     multifunction_node( graph &g, size_t concurrency, Body body) :
00756         graph_node(g), base_type(g,concurrency, body, new queue_type())
00757     {}
00758     multifunction_node( const multifunction_node &other) :
00759         graph_node(other.my_graph), base_type(other, new queue_type())
00760     {}
00761 };  // multifunction_node
00762 
00764 //  successors.  The node has unlimited concurrency, so though it is marked as
00765 //  "rejecting" it does not reject inputs.
00766 template<typename TupleType, typename Allocator=cache_aligned_allocator<TupleType> >
00767 class split_node : public multifunction_node<TupleType, TupleType, rejecting, Allocator> {
00768     static const int N = std::tuple_size<TupleType>::value;
00769     typedef multifunction_node<TupleType,TupleType,rejecting,Allocator> base_type;
00770 public:
00771     typedef typename base_type::output_ports_type output_ports_type;
00772 private:
00773     struct splitting_body {
00774         void operator()(const TupleType& t, output_ports_type &p) {
00775             internal::emit_element<N>::emit_this(t, p);
00776         }
00777     };
00778 public:
00779     typedef TupleType input_type;
00780     typedef Allocator allocator_type;
00781     split_node(graph &g) : base_type(g, unlimited, splitting_body()) {}
00782     split_node( const split_node & other) : base_type(other) {}
00783 };
00784 
00786 template <typename Output>
00787 class continue_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
00788     using graph_node::my_graph;
00789 public:
00790     typedef continue_msg input_type;
00791     typedef Output output_type;
00792     typedef sender< input_type > predecessor_type;
00793     typedef receiver< output_type > successor_type;
00794     typedef internal::function_output<output_type> fOutput_type;
00795 
00797     template <typename Body >
00798     continue_node( graph &g, Body body ) :
00799         graph_node(g), internal::continue_input<output_type>( g, body )
00800     {}
00801 
00803     template <typename Body >
00804     continue_node( graph &g, int number_of_predecessors, Body body ) :
00805         graph_node(g), internal::continue_input<output_type>( g, number_of_predecessors, body )
00806     {}
00807 
00809     continue_node( const continue_node& src ) :
00810         graph_node(src.my_graph), internal::continue_input<output_type>(src),
00811         internal::function_output<Output>()
00812     {}
00813 
00814     bool try_put(const input_type &i) { return internal::continue_input<Output>::try_put(i); }
00815 
00816 protected:
00817     /* override */ internal::broadcast_cache<output_type> &successors () { return fOutput_type::my_successors; }
00818 };
00819 
00820 template< typename T >
00821 class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
00822     using graph_node::my_graph;
00823 public:
00824     typedef T input_type;
00825     typedef T output_type;
00826     typedef sender< input_type > predecessor_type;
00827     typedef receiver< output_type > successor_type;
00828 
00829     overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
00830         my_successors.set_owner( this );
00831     }
00832 
00833     // Copy constructor; doesn't take anything from src; default won't work
00834     overwrite_node( const overwrite_node& src ) :
00835         graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
00836     {
00837         my_successors.set_owner( this );
00838     }
00839 
00840     ~overwrite_node() {}
00841 
00842     /* override */ bool register_successor( successor_type &s ) {
00843         spin_mutex::scoped_lock l( my_mutex );
00844         if ( my_buffer_is_valid ) {
00845             // We have a valid value that must be forwarded immediately.
00846             if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
00847                 // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
00848                 my_successors.register_successor( s );
00849                 return true;
00850             } else {
00851                 // We don't add the successor: it rejected our put and we became its predecessor instead
00852                 return false;
00853             }
00854         } else {
00855             // No valid value yet, just add as successor
00856             my_successors.register_successor( s );
00857             return true;
00858         }
00859     }
00860 
00861     /* override */ bool remove_successor( successor_type &s ) {
00862         spin_mutex::scoped_lock l( my_mutex );
00863         my_successors.remove_successor(s);
00864         return true;
00865     }
00866 
00867     /* override */ bool try_put( const T &v ) {
00868         spin_mutex::scoped_lock l( my_mutex );
00869         my_buffer = v;
00870         my_buffer_is_valid = true;
00871         my_successors.try_put(v);
00872         return true;
00873     }
00874 
00875     /* override */ bool try_get( T &v ) {
00876         spin_mutex::scoped_lock l( my_mutex );
00877         if ( my_buffer_is_valid ) {
00878             v = my_buffer;
00879             return true;
00880         } else {
00881             return false;
00882         }
00883     }
00884 
00885     bool is_valid() {
00886        spin_mutex::scoped_lock l( my_mutex );
00887        return my_buffer_is_valid;
00888     }
00889 
00890     void clear() {
00891        spin_mutex::scoped_lock l( my_mutex );
00892        my_buffer_is_valid = false;
00893     }
00894 
00895 protected:
00896     spin_mutex my_mutex;
00897     internal::broadcast_cache< T, null_rw_mutex > my_successors;
00898     T my_buffer;
00899     bool my_buffer_is_valid;
00900 };
00901 
00902 template< typename T >
00903 class write_once_node : public overwrite_node<T> {
00904 public:
00905     typedef T input_type;
00906     typedef T output_type;
00907     typedef sender< input_type > predecessor_type;
00908     typedef receiver< output_type > successor_type;
00909 
00911     write_once_node(graph& g) : overwrite_node<T>(g) {}
00912 
00914     write_once_node( const write_once_node& src ) : overwrite_node<T>(src) {}
00915 
00916     /* override */ bool try_put( const T &v ) {
00917         spin_mutex::scoped_lock l( this->my_mutex );
00918         if ( this->my_buffer_is_valid ) {
00919             return false;
00920         } else {
00921             this->my_buffer = v;
00922             this->my_buffer_is_valid = true;
00923             this->my_successors.try_put(v);
00924             return true;
00925         }
00926     }
00927 };
00928 
00930 template <typename T>
00931 class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
00932     using graph_node::my_graph;
00933     internal::broadcast_cache<T> my_successors;
00934 public:
00935     typedef T input_type;
00936     typedef T output_type;
00937     typedef sender< input_type > predecessor_type;
00938     typedef receiver< output_type > successor_type;
00939 
00940     broadcast_node(graph& g) : graph_node(g) {
00941         my_successors.set_owner( this );
00942     }
00943 
00944     // Copy constructor
00945     broadcast_node( const broadcast_node& src ) :
00946         graph_node(src.my_graph), receiver<T>(), sender<T>()
00947     {
00948         my_successors.set_owner( this );
00949     }
00950 
00952     virtual bool register_successor( receiver<T> &r ) {
00953         my_successors.register_successor( r );
00954         return true;
00955     }
00956 
00958     virtual bool remove_successor( receiver<T> &r ) {
00959         my_successors.remove_successor( r );
00960         return true;
00961     }
00962 
00963     /* override */ bool try_put( const T &t ) {
00964         my_successors.try_put(t);
00965         return true;
00966     }
00967 };
00968 
00969 #include "internal/_flow_graph_item_buffer_impl.h"
00970 
00972 template <typename T, typename A=cache_aligned_allocator<T> >
00973 class buffer_node : public graph_node, public reservable_item_buffer<T, A>, public receiver<T>, public sender<T> {
00974     using graph_node::my_graph;
00975 public:
00976     typedef T input_type;
00977     typedef T output_type;
00978     typedef sender< input_type > predecessor_type;
00979     typedef receiver< output_type > successor_type;
00980     typedef buffer_node<T, A> my_class;
00981 protected:
00982     typedef size_t size_type;
00983     internal::round_robin_cache< T, null_rw_mutex > my_successors;
00984 
00985     task *my_parent;
00986 
00987     friend class internal::forward_task< buffer_node< T, A > >;
00988 
00989     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
00990     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00991 
00992     // implements the aggregator_operation concept
00993     class buffer_operation : public internal::aggregated_operation< buffer_operation > {
00994     public:
00995         char type;
00996         T *elem;
00997         successor_type *r;
00998         buffer_operation(const T& e, op_type t) :
00999             type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
01000         buffer_operation(op_type t) : type(char(t)), r(NULL) {}
01001     };
01002 
01003     bool forwarder_busy;
01004     typedef internal::aggregating_functor<my_class, buffer_operation> my_handler;
01005     friend class internal::aggregating_functor<my_class, buffer_operation>;
01006     internal::aggregator< my_handler, buffer_operation> my_aggregator;
01007 
01008     virtual void handle_operations(buffer_operation *op_list) {
01009         buffer_operation *tmp;
01010         bool try_forwarding=false;
01011         while (op_list) {
01012             tmp = op_list;
01013             op_list = op_list->next;
01014             switch (tmp->type) {
01015             case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
01016             case rem_succ: internal_rem_succ(tmp); break;
01017             case req_item: internal_pop(tmp); break;
01018             case res_item: internal_reserve(tmp); break;
01019             case rel_res:  internal_release(tmp);  try_forwarding = true; break;
01020             case con_res:  internal_consume(tmp);  try_forwarding = true; break;
01021             case put_item: internal_push(tmp);  try_forwarding = true; break;
01022             case try_fwd:  internal_forward(tmp); break;
01023             }
01024         }
01025         if (try_forwarding && !forwarder_busy) {
01026             forwarder_busy = true;
01027             task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type, A> >(*this));
01028         }
01029     }
01030 
01032     virtual void forward() {
01033         buffer_operation op_data(try_fwd);
01034         do {
01035             op_data.status = WAIT;
01036             my_aggregator.execute(&op_data);
01037         } while (op_data.status == SUCCEEDED);
01038     }
01039 
01041     virtual void internal_reg_succ(buffer_operation *op) {
01042         my_successors.register_successor(*(op->r));
01043         __TBB_store_with_release(op->status, SUCCEEDED);
01044     }
01045 
01047     virtual void internal_rem_succ(buffer_operation *op) {
01048         my_successors.remove_successor(*(op->r));
01049         __TBB_store_with_release(op->status, SUCCEEDED);
01050     }
01051 
01053     virtual void internal_forward(buffer_operation *op) {
01054         T i_copy;
01055         bool success = false; // flagged when a successor accepts
01056         size_type counter = my_successors.size();
01057         // Try forwarding, giving each successor a chance
01058         while (counter>0 && !this->buffer_empty() && this->item_valid(this->my_tail-1)) {
01059             this->fetch_back(i_copy);
01060             if( my_successors.try_put(i_copy) ) {
01061                 this->invalidate_back();
01062                 --(this->my_tail);
01063                 success = true; // found an accepting successor
01064             }
01065             --counter;
01066         }
01067         if (success && !counter)
01068             __TBB_store_with_release(op->status, SUCCEEDED);
01069         else {
01070             __TBB_store_with_release(op->status, FAILED);
01071             forwarder_busy = false;
01072         }
01073     }
01074 
01075     virtual void internal_push(buffer_operation *op) {
01076         this->push_back(*(op->elem));
01077         __TBB_store_with_release(op->status, SUCCEEDED);
01078     }
01079 
01080     virtual void internal_pop(buffer_operation *op) {
01081         if(this->pop_back(*(op->elem))) {
01082             __TBB_store_with_release(op->status, SUCCEEDED);
01083         }
01084         else {
01085             __TBB_store_with_release(op->status, FAILED);
01086         }
01087     }
01088 
01089     virtual void internal_reserve(buffer_operation *op) {
01090         if(this->reserve_front(*(op->elem))) {
01091             __TBB_store_with_release(op->status, SUCCEEDED);
01092         }
01093         else {
01094             __TBB_store_with_release(op->status, FAILED);
01095         }
01096     }
01097 
01098     virtual void internal_consume(buffer_operation *op) {
01099         this->consume_front();
01100         __TBB_store_with_release(op->status, SUCCEEDED);
01101     }
01102 
01103     virtual void internal_release(buffer_operation *op) {
01104         this->release_front();
01105         __TBB_store_with_release(op->status, SUCCEEDED);
01106     }
01107 
01108 public:
01110     buffer_node( graph &g ) : graph_node(g), reservable_item_buffer<T>(),
01111         my_parent( g.root_task() ), forwarder_busy(false) {
01112         my_successors.set_owner(this);
01113         my_aggregator.initialize_handler(my_handler(this));
01114     }
01115 
01117     buffer_node( const buffer_node& src ) : graph_node(src.my_graph),
01118         reservable_item_buffer<T>(), receiver<T>(), sender<T>(),
01119         my_parent( src.my_parent ) {
01120         forwarder_busy = false;
01121         my_successors.set_owner(this);
01122         my_aggregator.initialize_handler(my_handler(this));
01123     }
01124 
01125     virtual ~buffer_node() {}
01126 
01127     //
01128     // message sender implementation
01129     //
01130 
01132 
01133     /* override */ bool register_successor( receiver<output_type> &r ) {
01134         buffer_operation op_data(reg_succ);
01135         op_data.r = &r;
01136         my_aggregator.execute(&op_data);
01137         return true;
01138     }
01139 
01141 
01143     /* override */ bool remove_successor( receiver<output_type> &r ) {
01144         r.remove_predecessor(*this);
01145         buffer_operation op_data(rem_succ);
01146         op_data.r = &r;
01147         my_aggregator.execute(&op_data);
01148         return true;
01149     }
01150 
01152 
01154     /* override */ bool try_get( T &v ) {
01155         buffer_operation op_data(req_item);
01156         op_data.elem = &v;
01157         my_aggregator.execute(&op_data);
01158         return (op_data.status==SUCCEEDED);
01159     }
01160 
01162 
01164     /* override */ bool try_reserve( T &v ) {
01165         buffer_operation op_data(res_item);
01166         op_data.elem = &v;
01167         my_aggregator.execute(&op_data);
01168         return (op_data.status==SUCCEEDED);
01169     }
01170 
01172 
01173     /* override */ bool try_release() {
01174         buffer_operation op_data(rel_res);
01175         my_aggregator.execute(&op_data);
01176         return true;
01177     }
01178 
01180 
01181     /* override */ bool try_consume() {
01182         buffer_operation op_data(con_res);
01183         my_aggregator.execute(&op_data);
01184         return true;
01185     }
01186 
01188 
01189     /* override */ bool try_put(const T &t) {
01190         buffer_operation op_data(t, put_item);
01191         my_aggregator.execute(&op_data);
01192         return true;
01193     }
01194 };
01195 
01197 template <typename T, typename A=cache_aligned_allocator<T> >
01198 class queue_node : public buffer_node<T, A> {
01199 protected:
01200     typedef typename buffer_node<T, A>::size_type size_type;
01201     typedef typename buffer_node<T, A>::buffer_operation queue_operation;
01202 
01203     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01204 
01206     /* override */ void internal_forward(queue_operation *op) {
01207         T i_copy;
01208         bool success = false; // flagged when a successor accepts
01209         size_type counter = this->my_successors.size();
01210         if (this->my_reserved || !this->item_valid(this->my_head)) {
01211             __TBB_store_with_release(op->status, FAILED);
01212             this->forwarder_busy = false;
01213             return;
01214         }
01215         // Keep trying to send items while there is at least one accepting successor
01216         while (counter>0 && this->item_valid(this->my_head)) {
01217             this->fetch_front(i_copy);
01218             if(this->my_successors.try_put(i_copy)) {
01219                  this->invalidate_front();
01220                  ++(this->my_head);
01221                 success = true; // found an accepting successor
01222             }
01223             --counter;
01224         }
01225         if (success && !counter)
01226             __TBB_store_with_release(op->status, SUCCEEDED);
01227         else {
01228             __TBB_store_with_release(op->status, FAILED);
01229             this->forwarder_busy = false;
01230         }
01231     }
01232 
01233     /* override */ void internal_pop(queue_operation *op) {
01234         if ( this->my_reserved || !this->item_valid(this->my_head)){
01235             __TBB_store_with_release(op->status, FAILED);
01236         }
01237         else {
01238             this->pop_front(*(op->elem));
01239             __TBB_store_with_release(op->status, SUCCEEDED);
01240         }
01241     }
01242     /* override */ void internal_reserve(queue_operation *op) {
01243         if (this->my_reserved || !this->item_valid(this->my_head)) {
01244             __TBB_store_with_release(op->status, FAILED);
01245         }
01246         else {
01247             this->my_reserved = true;
01248             this->fetch_front(*(op->elem));
01249             this->invalidate_front();
01250             __TBB_store_with_release(op->status, SUCCEEDED);
01251         }
01252     }
01253     /* override */ void internal_consume(queue_operation *op) {
01254         this->consume_front();
01255         __TBB_store_with_release(op->status, SUCCEEDED);
01256     }
01257 
01258 public:
01259     typedef T input_type;
01260     typedef T output_type;
01261     typedef sender< input_type > predecessor_type;
01262     typedef receiver< output_type > successor_type;
01263 
01265     queue_node( graph &g ) : buffer_node<T, A>(g) {}
01266 
01268     queue_node( const queue_node& src) : buffer_node<T, A>(src) {}
01269 };
01270 
01272 template< typename T, typename A=cache_aligned_allocator<T> >
01273 class sequencer_node : public queue_node<T, A> {
01274     internal::function_body< T, size_t > *my_sequencer;
01275 public:
01276     typedef T input_type;
01277     typedef T output_type;
01278     typedef sender< input_type > predecessor_type;
01279     typedef receiver< output_type > successor_type;
01280 
01282     template< typename Sequencer >
01283     sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, A>(g),
01284         my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01285 
01287     sequencer_node( const sequencer_node& src ) : queue_node<T, A>(src),
01288         my_sequencer( src.my_sequencer->clone() ) {}
01289 
01291     ~sequencer_node() { delete my_sequencer; }
01292 protected:
01293     typedef typename buffer_node<T, A>::size_type size_type;
01294     typedef typename buffer_node<T, A>::buffer_operation sequencer_operation;
01295 
01296     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01297 
01298 private:
01299     /* override */ void internal_push(sequencer_operation *op) {
01300         size_type tag = (*my_sequencer)(*(op->elem));
01301 
01302         this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01303 
01304         if(this->size() > this->capacity())
01305             this->grow_my_array(this->size());  // tail already has 1 added to it
01306         this->item(tag) = std::make_pair( *(op->elem), true );
01307         __TBB_store_with_release(op->status, SUCCEEDED);
01308     }
01309 };
01310 
01312 template< typename T, typename Compare = std::less<T>, typename A=cache_aligned_allocator<T> >
01313 class priority_queue_node : public buffer_node<T, A> {
01314 public:
01315     typedef T input_type;
01316     typedef T output_type;
01317     typedef sender< input_type > predecessor_type;
01318     typedef receiver< output_type > successor_type;
01319 
01321     priority_queue_node( graph &g ) : buffer_node<T, A>(g), mark(0) {}
01322 
01324     priority_queue_node( const priority_queue_node &src ) : buffer_node<T, A>(src), mark(0) {}
01325 
01326 protected:
01327     typedef typename buffer_node<T, A>::size_type size_type;
01328     typedef typename buffer_node<T, A>::item_type item_type;
01329     typedef typename buffer_node<T, A>::buffer_operation prio_operation;
01330 
01331     enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01332 
01333     /* override */ void handle_operations(prio_operation *op_list) {
01334         prio_operation *tmp /*, *pop_list*/ ;
01335         bool try_forwarding=false;
01336         while (op_list) {
01337             tmp = op_list;
01338             op_list = op_list->next;
01339             switch (tmp->type) {
01340             case buffer_node<T, A>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01341             case buffer_node<T, A>::rem_succ: this->internal_rem_succ(tmp); break;
01342             case buffer_node<T, A>::put_item: internal_push(tmp); try_forwarding = true; break;
01343             case buffer_node<T, A>::try_fwd: internal_forward(tmp); break;
01344             case buffer_node<T, A>::rel_res: internal_release(tmp); try_forwarding = true; break;
01345             case buffer_node<T, A>::con_res: internal_consume(tmp); try_forwarding = true; break;
01346             case buffer_node<T, A>::req_item: internal_pop(tmp); break;
01347             case buffer_node<T, A>::res_item: internal_reserve(tmp); break;
01348             }
01349         }
01350         // process pops!  for now, no special pop processing
01351         if (mark<this->my_tail) heapify();
01352         if (try_forwarding && !this->forwarder_busy) {
01353             this->forwarder_busy = true;
01354             task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type, A> >(*this));
01355         }
01356     }
01357 
01359     /* override */ void internal_forward(prio_operation *op) {
01360         T i_copy;
01361         bool success = false; // flagged when a successor accepts
01362         size_type counter = this->my_successors.size();
01363 
01364         if (this->my_reserved || this->my_tail == 0) {
01365             __TBB_store_with_release(op->status, FAILED);
01366             this->forwarder_busy = false;
01367             return;
01368         }
01369         // Keep trying to send while there exists an accepting successor
01370         while (counter>0 && this->my_tail > 0) {
01371             i_copy = this->my_array[0].first;
01372             bool msg = this->my_successors.try_put(i_copy);
01373             if ( msg == true ) {
01374                  if (mark == this->my_tail) --mark;
01375                 --(this->my_tail);
01376                 this->my_array[0].first=this->my_array[this->my_tail].first;
01377                 if (this->my_tail > 1) // don't reheap for heap of size 1
01378                     reheap();
01379                 success = true; // found an accepting successor
01380             }
01381             --counter;
01382         }
01383         if (success && !counter)
01384             __TBB_store_with_release(op->status, SUCCEEDED);
01385         else {
01386             __TBB_store_with_release(op->status, FAILED);
01387             this->forwarder_busy = false;
01388         }
01389     }
01390 
01391     /* override */ void internal_push(prio_operation *op) {
01392         if ( this->my_tail >= this->my_array_size )
01393             this->grow_my_array( this->my_tail + 1 );
01394         this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01395         ++(this->my_tail);
01396         __TBB_store_with_release(op->status, SUCCEEDED);
01397     }
01398     /* override */ void internal_pop(prio_operation *op) {
01399         if ( this->my_reserved == true || this->my_tail == 0 ) {
01400             __TBB_store_with_release(op->status, FAILED);
01401         }
01402         else {
01403             if (mark<this->my_tail &&
01404                 compare(this->my_array[0].first,
01405                         this->my_array[this->my_tail-1].first)) {
01406                 // there are newly pushed elems; last one higher than top
01407                 // copy the data
01408                 *(op->elem) = this->my_array[this->my_tail-1].first;
01409                 --(this->my_tail);
01410                 __TBB_store_with_release(op->status, SUCCEEDED);
01411             }
01412             else { // extract and push the last element down heap
01413                 *(op->elem) = this->my_array[0].first; // copy the data
01414                 if (mark == this->my_tail) --mark;
01415                 --(this->my_tail);
01416                 __TBB_store_with_release(op->status, SUCCEEDED);
01417                 this->my_array[0].first=this->my_array[this->my_tail].first;
01418                 if (this->my_tail > 1) // don't reheap for heap of size 1
01419                     reheap();
01420             }
01421         }
01422     }
01423     /* override */ void internal_reserve(prio_operation *op) {
01424         if (this->my_reserved == true || this->my_tail == 0) {
01425             __TBB_store_with_release(op->status, FAILED);
01426         }
01427         else {
01428             this->my_reserved = true;
01429             *(op->elem) = reserved_item = this->my_array[0].first;
01430             if (mark == this->my_tail) --mark;
01431             --(this->my_tail);
01432             __TBB_store_with_release(op->status, SUCCEEDED);
01433             this->my_array[0].first = this->my_array[this->my_tail].first;
01434             if (this->my_tail > 1) // don't reheap for heap of size 1
01435                 reheap();
01436         }
01437     }
01438     /* override */ void internal_consume(prio_operation *op) {
01439         this->my_reserved = false;
01440         __TBB_store_with_release(op->status, SUCCEEDED);
01441     }
01442     /* override */ void internal_release(prio_operation *op) {
01443         if (this->my_tail >= this->my_array_size)
01444             this->grow_my_array( this->my_tail + 1 );
01445         this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01446         ++(this->my_tail);
01447         this->my_reserved = false;
01448         __TBB_store_with_release(op->status, SUCCEEDED);
01449         heapify();
01450     }
01451 private:
01452     Compare compare;
01453     size_type mark;
01454     input_type reserved_item;
01455 
01456     void heapify() {
01457         if (!mark) mark = 1;
01458         for (; mark<this->my_tail; ++mark) { // for each unheaped element
01459             size_type cur_pos = mark;
01460             input_type to_place = this->my_array[mark].first;
01461             do { // push to_place up the heap
01462                 size_type parent = (cur_pos-1)>>1;
01463                 if (!compare(this->my_array[parent].first, to_place))
01464                     break;
01465                 this->my_array[cur_pos].first = this->my_array[parent].first;
01466                 cur_pos = parent;
01467             } while( cur_pos );
01468             this->my_array[cur_pos].first = to_place;
01469         }
01470     }
01471 
01472     void reheap() {
01473         size_type cur_pos=0, child=1;
01474         while (child < mark) {
01475             size_type target = child;
01476             if (child+1<mark &&
01477                 compare(this->my_array[child].first,
01478                         this->my_array[child+1].first))
01479                 ++target;
01480             // target now has the higher priority child
01481             if (compare(this->my_array[target].first,
01482                         this->my_array[this->my_tail].first))
01483                 break;
01484             this->my_array[cur_pos].first = this->my_array[target].first;
01485             cur_pos = target;
01486             child = (cur_pos<<1)+1;
01487         }
01488         this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01489     }
01490 };
01491 
01493 
01496 template< typename T >
01497 class limiter_node : public graph_node, public receiver< T >, public sender< T > {
01498     using graph_node::my_graph;
01499 public:
01500     typedef T input_type;
01501     typedef T output_type;
01502     typedef sender< input_type > predecessor_type;
01503     typedef receiver< output_type > successor_type;
01504 
01505 private:
01506     task *my_root_task;
01507     size_t my_threshold;
01508     size_t my_count;
01509     internal::predecessor_cache< T > my_predecessors;
01510     spin_mutex my_mutex;
01511     internal::broadcast_cache< T > my_successors;
01512     int init_decrement_predecessors;
01513 
01514     friend class internal::forward_task< limiter_node<T> >;
01515 
01516     // Let decrementer call decrement_counter()
01517     friend class internal::decrementer< limiter_node<T> >;
01518 
01519     void decrement_counter() {
01520         input_type v;
01521 
01522         // If we can't get / put an item immediately then drop the count
01523         if ( my_predecessors.get_item( v ) == false
01524              || my_successors.try_put(v) == false ) {
01525             spin_mutex::scoped_lock lock(my_mutex);
01526             --my_count;
01527             if ( !my_predecessors.empty() )
01528                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01529                             internal::forward_task< limiter_node<T> >( *this ) );
01530         }
01531     }
01532 
01533     void forward() {
01534         {
01535             spin_mutex::scoped_lock lock(my_mutex);
01536             if ( my_count < my_threshold )
01537                 ++my_count;
01538             else
01539                 return;
01540         }
01541         decrement_counter();
01542     }
01543 
01544 public:
01546     internal::decrementer< limiter_node<T> > decrement;
01547 
01549     limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0) :
01550         graph_node(g), my_root_task(g.root_task()), my_threshold(threshold), my_count(0),
01551         init_decrement_predecessors(num_decrement_predecessors),
01552         decrement(num_decrement_predecessors)
01553     {
01554         my_predecessors.set_owner(this);
01555         my_successors.set_owner(this);
01556         decrement.set_owner(this);
01557     }
01558 
01560     limiter_node( const limiter_node& src ) :
01561         graph_node(src.my_graph), receiver<T>(), sender<T>(),
01562         my_root_task(src.my_root_task), my_threshold(src.my_threshold), my_count(0),
01563         init_decrement_predecessors(src.init_decrement_predecessors),
01564         decrement(src.init_decrement_predecessors)
01565     {
01566         my_predecessors.set_owner(this);
01567         my_successors.set_owner(this);
01568         decrement.set_owner(this);
01569     }
01570 
01572     /* override */ bool register_successor( receiver<output_type> &r ) {
01573         my_successors.register_successor(r);
01574         return true;
01575     }
01576 
01578 
01579     /* override */ bool remove_successor( receiver<output_type> &r ) {
01580         r.remove_predecessor(*this);
01581         my_successors.remove_successor(r);
01582         return true;
01583     }
01584 
01586     /* override */ bool try_put( const T &t ) {
01587         {
01588             spin_mutex::scoped_lock lock(my_mutex);
01589             if ( my_count >= my_threshold )
01590                 return false;
01591             else
01592                 ++my_count;
01593         }
01594 
01595         bool msg = my_successors.try_put(t);
01596 
01597         if ( msg != true ) {
01598             spin_mutex::scoped_lock lock(my_mutex);
01599             --my_count;
01600             if ( !my_predecessors.empty() )
01601                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01602                             internal::forward_task< limiter_node<T> >( *this ) );
01603         }
01604 
01605         return msg;
01606     }
01607 
01609     /* override */ bool register_predecessor( predecessor_type &src ) {
01610         spin_mutex::scoped_lock lock(my_mutex);
01611         my_predecessors.add( src );
01612         if ( my_count < my_threshold && !my_successors.empty() )
01613             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01614                            internal::forward_task< limiter_node<T> >( *this ) );
01615         return true;
01616     }
01617 
01619     /* override */ bool remove_predecessor( predecessor_type &src ) {
01620         my_predecessors.remove( src );
01621         return true;
01622     }
01623 };
01624 
01625 #include "internal/_flow_graph_join_impl.h"
01626 
01627 using internal::reserving_port;
01628 using internal::queueing_port;
01629 using internal::tag_matching_port;
01630 using internal::input_port;
01631 using internal::tag_value;
01632 using internal::NO_TAG;
01633 
01634 template<typename OutputTuple, graph_buffer_policy JP=queueing> class join_node;
01635 
01636 template<typename OutputTuple>
01637 class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
01638 private:
01639     static const int N = std::tuple_size<OutputTuple>::value;
01640     typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
01641 public:
01642     typedef OutputTuple output_type;
01643     typedef typename unfolded_type::input_ports_type input_ports_type;
01644     join_node(graph &g) : unfolded_type(g) { }
01645     join_node(const join_node &other) : unfolded_type(other) {}
01646 };
01647 
01648 template<typename OutputTuple>
01649 class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
01650 private:
01651     static const int N = std::tuple_size<OutputTuple>::value;
01652     typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
01653 public:
01654     typedef OutputTuple output_type;
01655     typedef typename unfolded_type::input_ports_type input_ports_type;
01656     join_node(graph &g) : unfolded_type(g) { }
01657     join_node(const join_node &other) : unfolded_type(other) {}
01658 };
01659 
01660 // template for tag_matching join_node
01661 template<typename OutputTuple>
01662 class join_node<OutputTuple, tag_matching> : public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value,
01663       tag_matching_port, OutputTuple, tag_matching> {
01664 private:
01665     static const int N = std::tuple_size<OutputTuple>::value;
01666     typedef typename internal::unfolded_join_node<N, tag_matching_port, OutputTuple, tag_matching> unfolded_type;
01667 public:
01668     typedef OutputTuple output_type;
01669     typedef typename unfolded_type::input_ports_type input_ports_type;
01670     template<typename B0, typename B1>
01671     join_node(graph &g, B0 b0, B1 b1) : unfolded_type(g, b0, b1) { }
01672     template<typename B0, typename B1, typename B2>
01673     join_node(graph &g, B0 b0, B1 b1, B2 b2) : unfolded_type(g, b0, b1, b2) { }
01674     template<typename B0, typename B1, typename B2, typename B3>
01675     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3) : unfolded_type(g, b0, b1, b2, b3) { }
01676     template<typename B0, typename B1, typename B2, typename B3, typename B4>
01677     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4) : unfolded_type(g, b0, b1, b2, b3, b4) { }
01678     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5>
01679     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5) : unfolded_type(g, b0, b1, b2, b3, b4, b5) { }
01680     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6>
01681     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { }
01682     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7>
01683     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { }
01684     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8>
01685     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { }
01686     template<typename B0, typename B1, typename B2, typename B3, typename B4, typename B5, typename B6, typename B7, typename B8, typename B9>
01687     join_node(graph &g, B0 b0, B1 b1, B2 b2, B3 b3, B4 b4, B5 b5, B6 b6, B7 b7, B8 b8, B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { }
01688     join_node(const join_node &other) : unfolded_type(other) {}
01689 };
01690 
01691 #if TBB_PREVIEW_GRAPH_NODES
01692 // or node
01693 #include "internal/_flow_graph_or_impl.h"
01694 
01695 template<typename InputTuple>
01696 class or_node : public internal::unfolded_or_node<InputTuple> {
01697 private:
01698     static const int N = std::tuple_size<InputTuple>::value;
01699 public:
01700     typedef typename internal::or_output_type<InputTuple>::type output_type;
01701     typedef typename internal::unfolded_or_node<InputTuple> unfolded_type;
01702     or_node(graph& g) : unfolded_type(g) { }
01703     // Copy constructor
01704     or_node( const or_node& other ) : unfolded_type(other) { }
01705 };
01706 #endif  // TBB_PREVIEW_GRAPH_NODES
01707 
01709 template< typename T >
01710 inline void make_edge( sender<T> &p, receiver<T> &s ) {
01711     p.register_successor( s );
01712 }
01713 
01715 template< typename T >
01716 inline void remove_edge( sender<T> &p, receiver<T> &s ) {
01717     p.remove_successor( s );
01718 }
01719 
01721 template< typename Body, typename Node >
01722 Body copy_body( Node &n ) {
01723     return n.template copy_function_object<Body>();
01724 }
01725 
01726 } // interface6
01727 
01728     using interface6::graph;
01729     using interface6::graph_node;
01730     using interface6::continue_msg;
01731     using interface6::sender;
01732     using interface6::receiver;
01733     using interface6::continue_receiver;
01734 
01735     using interface6::source_node;
01736     using interface6::function_node;
01737     using interface6::multifunction_node;
01738     using interface6::split_node;
01739     using interface6::internal::output_port;
01740 #if TBB_PREVIEW_GRAPH_NODES
01741     using interface6::or_node;
01742 #endif
01743     using interface6::continue_node;
01744     using interface6::overwrite_node;
01745     using interface6::write_once_node;
01746     using interface6::broadcast_node;
01747     using interface6::buffer_node;
01748     using interface6::queue_node;
01749     using interface6::sequencer_node;
01750     using interface6::priority_queue_node;
01751     using interface6::limiter_node;
01752     using namespace interface6::internal::graph_policy_namespace;
01753     using interface6::join_node;
01754     using interface6::input_port;
01755     using interface6::copy_body; 
01756     using interface6::make_edge; 
01757     using interface6::remove_edge; 
01758     using interface6::internal::NO_TAG;
01759     using interface6::internal::tag_value;
01760 
01761 } // flow
01762 } // tbb
01763 
01764 #endif // __TBB_flow_graph_H

Copyright © 2005-2012 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.