CnC
join.h
1 /* *******************************************************************************
2  * Copyright (c) 2007-2014, Intel Corporation
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  *
7  * * Redistributions of source code must retain the above copyright notice,
8  * this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of Intel Corporation nor the names of its contributors
13  * may be used to endorse or promote products derived from this software
14  * without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  ********************************************************************************/
27 
28 /*
29  Joins/corss products for CnC, provided as a re-usable graph.
30 */
31 
32 #ifndef _CnC_JOIN_H_
33 #define _CnC_JOIN_H_
34 
35 #include <cnc/internal/cnc_stddef.h>
36 #include <cnc/internal/tbbcompat.h>
37 #include <tbb/queuing_mutex.h>
38 #include <set>
39 
40 namespace CnC
41 {
42 
43  template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > class join;
44 
45  /// Returns a graph that joins 2 tag-collections into a third one.
46  /// Continuously produces the join product of the 2 input tag-collections
47  /// and puts it into the output tag-collection.
48  ///
49  /// Accepts any types; only requires that a an output tag-type
50  /// provides a constructor which accepts (taga, tagb) to construct
51  /// a joined tag from tag a and b.
52  ///
53  /// On distributed memory duplicate output tags might be produced.
54  /// To avoid duplicate step execution use tag-preservation on the
55  /// output tag-collection (CnC::preserve_tuner) and suitable
56  /// distribution functions on the prescribed step-collection
57  /// (CnC::step_tuner).
58  ///
59  /// \param ctxt the context to which the graph belongs
60  /// \param name the name of this join graph instance
61  /// \param a first input collection
62  /// \param b second input collection
63  /// \param c output collection
64  template< typename Ctxt, typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
65  graph * make_join_graph( CnC::context< Ctxt > & ctxt, const std::string & name,
69  {
70  return new join< TagA, TunerA, TagB, TunerB, TagC, TunerC >( ctxt, name, a, b, c );
71  }
72 
73  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
74  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
75 
76  // The actual join graph. A complex template construct; use
77  // make_join_graph to create a join graph. Only the constructor is
78  // public, everything else is private ("hidden").
79  //
80  // When ever a new tag arrives, it gets joined with all existing
81  // tags in the other collection. The actual join is protected by a
82  // mutex. This makes it safe on a single process.
83  //
84  // We cannot rely on the collection to keep tags because we
85  // currently have no thread-safe iteration facility. Hence we use
86  // std::set to keep them privately in the graph.
87  //
88  // We need explicit handling of distributed memory. We keep the
89  // tags locally where they are put and just send a bcast message
90  // when a new tag arrives. This can produce duplicate output tags.
91  template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
92  class join : public CnC::graph, CnC::Internal::no_copy
93  {
94  typedef tbb::queuing_mutex mutex_type;
95  typedef std::set< TagA > seta_type;
96  typedef std::set< TagB > setb_type;
97  typedef CnC::tag_collection< TagA, TunerA > colla_type;
98  typedef CnC::tag_collection< TagB, TunerB > collb_type;
99  typedef CnC::tag_collection< TagC, TunerC > collc_type;
100  typedef join< TagA, TunerA, TagB, TunerB, TagC, TunerC > join_type;
101  enum { JOINA = 'A', JOINB = 'B' };
102  friend struct on_a_put;
103  friend struct on_b_put;
104 
105  public:
106  template< typename Ctxt >
107  join( CnC::context< Ctxt > & ctxt, const std::string & name, colla_type & a, collb_type & b, collc_type & c )
108  : CnC::graph( ctxt, name ),
109  m_a( a ),
110  m_b( b ),
111  m_c( c ),
112  m_seta(),
113  m_setb(),
114  m_mutex()
115  {
116  // callback objects must persist the lifetime of collections
117  m_a.on_put( new on_a_put( this ) );
118  m_b.on_put( new on_b_put( this ) );
119  }
120 
121  // the implementation is "hidden"
122  private:
123  // join a given tag with all existing tags in the other set
124  template< typename Tag, typename Set, typename joiner >
125  void join_one( const Tag & t, const Set & set, const joiner & j )
126  {
127  // in a more advanced version we can go parallel by using parallel_for or a range
128  for( typename Set::iterator i=set.begin(); i!=set.end(); ++i ) {
129  m_c.put( j( t, *i ) );
130  }
131  }
132 
133  // join a b-tag with a a-tag
134  struct join_ba
135  {
136  TagC operator()( const TagB & b, const TagA & a ) const {
137  return TagC( a, b );
138  }
139  };
140  // join a a-tag with a b-tag
141  struct join_ab
142  {
143  TagC operator()( const TagA & a, const TagB & b ) const {
144  return TagC( a, b );
145  }
146  };
147  typedef join_ab join_ab_type;
148  typedef join_ba join_ba_type;
149 
150  template< typename Tag >
151  void send_tag( const Tag & tag, const char op )
152  {
153 #ifdef _DIST_CNC_
154  if( Internal::distributor::active() ) {
155  if( this->trace_level() > 2 ) {
156  Internal::Speaker oss(std::cout);
157  oss << this->name() << "::send_tag JOIN" << (op==JOINA?"A":"B") << " [";
158  cnc_format( oss, tag );
159  oss << "]";
160  }
161  CnC::serializer * ser = this->new_serializer();
162  (*ser) & op & tag;
163  this->bcast_msg( ser );
164  }
165 #endif
166  }
167 
168  // callback for collection a
169  struct on_a_put : public colla_type::callback_type
170  {
171  on_a_put( join_type * j ) : m_join( j ) {}
172 
173  void on_put( const TagA & tag )
174  {
175  if( m_join->trace_level() > 0 ) {
176  Internal::Speaker oss(std::cout);
177  oss << m_join->name() << "::on_put_a <";
178  cnc_format( oss, tag );
179  oss << ">";
180  }
181  m_join->send_tag( tag, JOINA );
182  mutex_type::scoped_lock _lock( m_join->m_mutex );
183  m_join->join_one( tag, m_join->m_setb, join_ab_type() );
184  m_join->m_seta.insert( tag );
185  }
186  private:
187  join_type * m_join;
188  };
189 
190  // callback for collection b
191  struct on_b_put : public collb_type::callback_type
192  {
193  on_b_put( join_type * j ) : m_join( j ) {}
194 
195  void on_put( const TagB & tag )
196  {
197  if( m_join->trace_level() > 0 ) {
198  Internal::Speaker oss(std::cout);
199  oss << m_join->name() << "::on_put_b <";
200  cnc_format( oss, tag );
201  oss << ">";
202  }
203  m_join->send_tag( tag, JOINB );
204  mutex_type::scoped_lock _lock( m_join->m_mutex );
205  m_join->join_one( tag, m_join->m_seta, join_ba_type() );
206  m_join->m_setb.insert( tag );
207  }
208  private:
209  join_type * m_join;
210  };
211 
212 #ifdef _DIST_CNC_
213  virtual void recv_msg( serializer * ser )
214  {
215  char _op;
216  (*ser) & _op;
217  switch( _op ) {
218  case JOINA : {
219  TagA _tag;
220  (*ser) & _tag;
221  if( this->trace_level() > 2 ) {
222  Internal::Speaker oss(std::cout);
223  oss << this->name() << "::recv_msg JOINA <";
224  cnc_format( oss, _tag );
225  oss << ">";
226  }
227  mutex_type::scoped_lock _lock( m_mutex );
228  join_one( _tag, m_setb, join_ab_type() );
229  break;
230  }
231  case JOINB : {
232  CNC_ASSERT_MSG( _op == JOINB, "Unexpected message tag" );
233  TagB _tag;
234  (*ser) & _tag;
235  if( this->trace_level() > 2 ) {
236  Internal::Speaker oss(std::cout);
237  oss << this->name() << "::recv_msg JOINB <";
238  cnc_format( oss, _tag );
239  oss << ">";
240  }
241  mutex_type::scoped_lock _lock( m_mutex );
242  join_one( _tag, m_seta, join_ba_type() );
243  break;
244  }
245  default :
246  CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
247  }
248  }
249 #endif
250 
251  colla_type & m_a;
252  collb_type & m_b;
253  collc_type & m_c;
254  mutex_type m_mutex;
255  seta_type m_seta;
256  setb_type m_setb;
257  };
258 
259 
260 } // namespace CnC
261 
262 #endif //_CnC_JOIN_H_
CnC API.
Definition: cnc.h:49
A tag collection is a set of tags of the same type. It is used to prescribe steps. By default, tags are not stored.
Definition: cnc.h:56
Internal::tag_collection_base< Tag, Tuner >::callback_type callback_type
Definition: cnc.h:219
Base class for defining and using CnC (sub-)graphs.
Definition: cnc.h:429
CnC context bringing together collections (for steps, items and tags).
Definition: cnc.h:54
graph * make_join_graph(CnC::context< Ctxt > &ctxt, const std::string &name, CnC::tag_collection< TagA, TunerA > &a, CnC::tag_collection< TagB, TunerB > &b, CnC::tag_collection< TagC, TunerC > &c)
Definition: join.h:65
Handles serilialization of data-objects.
Definition: serializer.h:348