CnC
reduce.h
1 /* *******************************************************************************
2  * Copyright (c) 2007-2014, Intel Corporation
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are met:
6  *
7  * * Redistributions of source code must retain the above copyright notice,
8  * this list of conditions and the following disclaimer.
9  * * Redistributions in binary form must reproduce the above copyright
10  * notice, this list of conditions and the following disclaimer in the
11  * documentation and/or other materials provided with the distribution.
12  * * Neither the name of Intel Corporation nor the names of its contributors
13  * may be used to endorse or promote products derived from this software
14  * without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19  * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22  * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23  * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  ********************************************************************************/
27 
28 /*
29  Reductions for CnC, provided as a re-usable graph.
30 */
31 
32 #ifndef _CnC_REDUCE_H_
33 #define _CnC_REDUCE_H_
34 
35 #include <cnc/internal/cnc_stddef.h>
36 #include <cnc/internal/tbbcompat.h>
37 #include <tbb/concurrent_unordered_map.h>
38 #include <tbb/combinable.h>
39 #include <tbb/atomic.h>
40 #include <tbb/spin_rw_mutex.h>
41 #include <functional>
42 
43 namespace CnC
44 {
45 
46  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > class reduction;
47 
48  /// \defgroup reductions Asynchronous Reductions
49  /// @{
50 
51  /// Creates a graph for asynchronous reductions.
52  ///
53  /// Takes an input collection and reduces its content with a given
54  /// operation and selection mechanism. The computation is done
55  /// while new items arrive. Not all items need to be available to
56  /// start or make progress. Data input is provided by normal puts
57  /// into the input collection. The final reduced value for a
58  /// reduction is put into the output collection.
59  ///
60  /// Supports multiple concurrent reductions (with the same
61  /// operation) identified by a reduction id. For this, a selector
62  /// functor can be provided to tell which data-item goes to which
63  /// reduction (maps a data-tag to a reduction-id).
64  ///
65  /// The number reduced items per reduction-id needs to be provided
66  /// through a second input collection. You can signal no more
67  /// incoming values by putting a count < 0. Providing counts
68  /// late reduces communication and potentially improves performance.
69  ///
70  /// Each reduction is independent of other reductions and can
71  /// finish independently while others are still processing.
72  /// Connected graphs can get the reduced values with a normal
73  /// get-calls (using the desired reduction-id as the tag).
74  ///
75  /// The implementation is virtually lock-free. On distributed memory
76  /// the additional communication is also largely asynchronous.
77  ///
78  /// See also \ref reuse
79  ///
80  /// \param ctxt the context to which the graph belongs
81  /// \param name the name of this reduction graph instance
82  /// \param in input collection, every item that's put here is
83  /// applied to sel and potentially takes part in a reduction
84  /// \param cnt input collection; number of items for each reduction
85  /// expected to be put here (tag is reduction-id, value is count)
86  /// \param out output collection, reduced results are put here with tags as returned by sel
87  /// \param op the reduction operation:\n
88  /// IType (*)(const IType&, const IType&) const\n
89  /// usually a functor
90  /// \param idty the identity/neutral element for the given operation
91  /// \param sel functor, called once for every item put into "in":\n
92  /// bool (*)( const ITag & itag, OTag & otag ) const\n
93  /// must return true if given element should be used for a reduction, otherwise false;\n
94  /// if true, it must set otag to the tag of the reduction it participates in
95  template< typename Ctxt, typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
96  graph * make_reduce_graph( CnC::context< Ctxt > & ctxt, const std::string & name,
100  const ReduceOp & op,
101  const IType & idty,
102  const Select & sel )
103  {
104  return new reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >( ctxt, name, in, cnt, out, op, idty, sel );
105  }
106 
107 
108  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
109  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
110  // The below is the implementation, normaly users shouldn't need to read it
111  // However, you might want to use this a template for writring your own reduction.
112  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
113  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
114 
115 
116 #ifdef _DIST_CNC_
117  // message tags for dist CnC
118  namespace DISTRED {
119  static const char BCASTCOUNT = 93;
120  static const char GATHERCOUNT = 94;
121  static const char DONE = 95;
122  static const char ALLDONE = 96;
123  static const char VALUE = 97;
124  static const char ALLVALUES = 98;
125  }
126 #endif
127 
128  // status of each reduction
129  static const int LOCAL = 0;
130  static const int CNT_AVAILABLE = 1;
131  static const int BCAST_DONE = 2;
132  static const int FINISH = 3;
133  static const int DONE = 4;
134 
135  // shortcut macro for ITAC instrumentation
136 #ifdef CNC_WITH_ITAC
137 # define TRACE( _m ) static std::string _t_n_( m_reduce->name() + _m ); VT_FUNC( _t_n_.c_str() );
138 #else
139 # define TRACE( _m )
140 #endif
141 
142 #define M1 static_cast< CType >( -1 )
143 
144  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
145  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
146 
147  // The actual reduction graph. A complex template construct; use
148  // make_reduce_graph to create a reduction graph. Only the constructor is
149  // public, everything else is private ("hidden").
150  //
151  // We use tbb::combinable for the local (shard memory) reduction part.
152  //
153  // On distributed memory we implement the following asyncrhonous protocol
154  // - As long as the count of a given reduction is unkown, we proceed as everything was local.
155  // - As soon as the count arrives, we do the following
156  // 0. assign ownership of the reduction to the count-providing process
157  // The owner controls gathering the distributed values when the count
158  // 1a. if count is a real count, it bcasts the count
159  // 1a if it's a done-flag (-1), we immediately move to 3.
160  // 2a. immediately followed by a gather of the counts (but not values)
161  // 2b. processes which are not the owners send a message for every additional item that was not gathered in 2a
162  // 3. once the onwer sees that all items for the reduction have arrived it bcast DONE
163  // 4. immediately followed by a gather of the values
164  // 5. when the owner collected all values, it use tbb::combinable and puts the final value
165  // As almost everything can happen at the same time, we use transactional-like
166  // operations implemented with atomic variables which guide each reduction through its states
167  // We implement our own bcast/gather tree individually for each owner (as its root)
168  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
169  class reduction : public CnC::graph//, CnC::Internal::no_copy
170  {
171  public:
175 
176  template< typename Ctxt >
177  reduction( CnC::context< Ctxt > & ctxt, const std::string & name, icoll_type & in, ccoll_type & c, ocoll_type & out, const ReduceOp & red, const IType & identity, const Select & sel );
178  ~reduction();
179 
180  // sometimes you can't tell number of reduced items until all computation is done.
181  // This call finalizes all reductions, no matter if a count was given or not.
182  void flush();
183 
184  // the implementation is "hidden"
185  private:
186  typedef reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select > reduce_type;
187  typedef tbb::spin_rw_mutex mutex_type;
188 
189  // thread-local storage per reduction
190  struct red_tls {
191  tbb::combinable< IType > val;
192  tbb::atomic< CType > nreduced;
193  tbb::atomic< CType > n;
194  mutex_type mtx;
195 #ifdef _DIST_CNC_
196  tbb::atomic< int > nCounts;
197  tbb::atomic< int > nValues;
198  int owner;
199 
200 #endif
201  tbb::atomic< int > status;
202  red_tls();
203  red_tls( const IType & v );
204  red_tls( const red_tls & rt );
205  private:
206  void operator=( const red_tls & rt );
207  };
208  typedef tbb::concurrent_unordered_map< OTag, red_tls > tls_map_type;
209 
210  // callback for collection a
211  struct on_item_put : public icoll_type::callback_type
212  {
213  on_item_put( reduce_type * r );
214  void on_put( const ITag & tag, const IType & val );
215 #ifdef _DIST_CNC_
216  void on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val );
217 #endif
218  void add_value( const typename tls_map_type::iterator & i, const IType & val ) const;
219  private:
220 
221  reduce_type * m_reduce;
222  };
223  friend struct on_item_put;
224 
225  // callback for count collection
226  struct on_count_put : public ccoll_type::callback_type
227  {
228  on_count_put( reduce_type * r );
229  void on_put( const OTag & otag, const CType & cnt );
230 #ifdef _DIST_CNC_
231  void on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner );
232  void on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner );
233  void on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt );
234 #endif
235  private:
236  reduce_type * m_reduce;
237  };
238  friend struct on_count_put;
239 
240  typename tls_map_type::iterator get( const OTag & tag );
241  bool try_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
242 #ifdef _DIST_CNC_
243  bool send_count( const OTag & otag, const typename tls_map_type::iterator & i, const int to, const bool always );
244  static int my_parent_for_root( const int root );
245  void try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
246  void try_send_or_put_all();
247  // home-grown bcast that uses a tree for each root
248  // at some point something like this should go into the CnC runtime
249  // returns the number of messages sent (0, 1 or 2)
250  int bcast( CnC::serializer * ser, int root );
251  int bcast_count( const OTag & tag, const CType & val, const int root );
252  virtual void recv_msg( serializer * ser );
253 #endif
254  icoll_type & m_in;
255  ccoll_type & m_cnt;
256  ocoll_type & m_out;
257  on_item_put * m_ondata;
258  on_count_put * m_oncount;
259  ReduceOp m_op;
260  Select m_sel;
261  tls_map_type m_reductions;
262  const IType m_identity;
263 #ifdef _DIST_CNC_
264  tbb::atomic< int > m_nDones;
265  bool m_alldone;
266 #endif
267  };
268 
269  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
270  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
271 
272  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
273  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls()
274  {
275  nreduced = 0;
276  n = M1;
277  status = LOCAL;
278 #ifdef _DIST_CNC_
279  nCounts = -1;
280  nValues = -1;
281  owner = -1;
282 #endif
283  }
284 
285  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
286 
287  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
288  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const IType & v )
289  {
290  val = v;
291  nreduced = 0;
292  n = M1;
293  status = LOCAL;
294 #ifdef _DIST_CNC_
295  nCounts = -1;
296  nValues = -1;
297  owner = -1;
298 #endif
299  }
300 
301  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
302 
303  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
304  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const red_tls & rt )
305  : val( rt.val ),
306  nreduced( rt.nreduced ),
307  n( rt.n ),
308  mtx(),
309 #ifdef _DIST_CNC_
310  nCounts( rt.nCounts ),
311  nValues( rt.nValues ),
312  owner( rt.owner ),
313 #endif
314  status( rt.status )
315  {}
316 
317  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
318  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
319 
320  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
321  template< typename Ctxt >
322  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::reduction( CnC::context< Ctxt > & ctxt, const std::string & name,
323  icoll_type & in, ccoll_type & c, ocoll_type & out,
324  const ReduceOp & red, const IType & identity, const Select & sel )
325  : CnC::graph( ctxt, name ),
326  m_in( in ),
327  m_cnt( c ),
328  m_out( out ),
329  m_ondata( new on_item_put( this ) ),
330  m_oncount( new on_count_put( this ) ),
331  m_op( red ),
332  m_sel( sel ),
333  m_reductions(),
334  m_identity( identity )
335  {
336  // callback objects must persist the lifetime of collections
337  m_in.on_put( m_ondata );
338  m_cnt.on_put( m_oncount );
339 #ifdef _DIST_CNC_
340  m_alldone = false;
341  m_nDones = -1;
342 #endif
343  }
344 
345  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
346 
347  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
348  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::~reduction()
349  {
350  // delete m_ondata;
351  // delete m_oncount;
352  }
353 
354  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
355 
356  // sometimes you can't tell number of reduced items until all computation is done.
357  // This call finalizes all reductions, no matter if a count was given or not.
358  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
359  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::flush()
360  {
361 #ifdef _DIST_CNC_
362  if( Internal::distributor::active() ) {
363  if( trace_level() > 0 ) {
364  Internal::Speaker oss(std::cout);
365  oss << this->name() << " flush: bcast ALLDONE";
366  }
367  CNC_ASSERT( CnC::tuner_base::myPid() == 0 );
368  m_alldone = true;
369  CnC::serializer * ser = this->new_serializer();
370  (*ser) & DISTRED::ALLDONE;
371  m_nDones = 1000; // protect from current messages
372  m_nDones += bcast( ser, 0 );
373  m_nDones -= 999;
374  try_send_or_put_all();
375  } else
376 #endif
377  for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
378  m_out.put( i->first, i->second.val.combine( m_op ) );
379  }
380  }
381 
382  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
383  typename reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::tls_map_type::iterator
384  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::get( const OTag & tag )
385  {
386  typename tls_map_type::iterator i = m_reductions.find( tag );
387  if( i == m_reductions.end() ) {
388  i = m_reductions.insert( typename tls_map_type::value_type( tag, red_tls() ) ).first;
389  }
390  return i;
391  }
392 
393  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
394 
395  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
396  bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
397  {
398  if( trace_level() > 2 ) {
399  Internal::Speaker oss(std::cout);
400  oss << this->name() << " try_put_value [";
401  cnc_format( oss, otag ) << "]"
402 #ifdef _DIST_CNC_
403  << " nValues " << i->second.nValues << " status " << i->second.status
404 #endif
405  ;
406  }
407  if( i->second.nreduced == i->second.n ) {
408  // setting n could go in parallel, it sets new n and then compares
409  mutex_type::scoped_lock _lock( i->second.mtx );
410  if( i->second.status != DONE ) {
411  this->m_out.put( otag, i->second.val.combine( this->m_op ) );
412  i->second.status = DONE;
413  return true;
414  }
415  }
416  return false;
417  }
418 
419  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
420 
421 #ifdef _DIST_CNC_
422  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
423  bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::send_count( const OTag & otag, const typename tls_map_type::iterator & i,
424  const int to, const bool always )
425  {
426  // we might have a count-put and the last value-put concurrently and all local
427  // only then the owner/to can be <0 or myself
428  if( to < 0 || to == CnC::tuner_base::myPid() ) return false;
429  CType _cnt = i->second.nreduced.fetch_and_store( 0 );
430  if( always || _cnt > 0 ) { // someone else might have transmitted the combined count already
431  CnC::serializer * ser = this->new_serializer();
432  (*ser) & DISTRED::GATHERCOUNT & otag & _cnt;
433  if( trace_level() > 2 ) {
434  Internal::Speaker oss(std::cout);
435  oss << this->name() << " send GATHERCOUNT [";
436  cnc_format( oss, otag ) << "] " << _cnt << " to " << to;
437  }
438  this->send_msg( ser, to );
439  return true;
440  }
441  return false;
442  }
443 
444  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
445 
446  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
447  int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::my_parent_for_root( const int root )
448  {
449  const int mpid = CnC::tuner_base::myPid();
450  const int nps = CnC::tuner_base::numProcs();
451  CNC_ASSERT( root != mpid );
452  int _p = ( ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) - 1 ) / 2 ) + root;
453  return _p % nps;
454  };
455 
456  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
457 
458  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
459  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
460  {
461  if( trace_level() > 2 ) {
462  Internal::Speaker oss(std::cout);
463  oss << this->name() << " try_send_or_put_value [";
464  cnc_format( oss, otag ) << "] nValues " << i->second.nValues << " status " << i->second.status;
465  }
466  if( i->second.nValues.fetch_and_decrement() == 1 ) {
467  if( i->second.owner == CnC::tuner_base::myPid() ) {
468  if( i->second.status == FINISH ) {
469  CNC_ASSERT( i->second.nreduced == i->second.n || i->second.n == M1 );
470  CNC_ASSERT( i->second.nValues == 0 && i->second.status == FINISH );
471  i->second.nreduced = i->second.n;
472  try_put_value( otag, i );
473  }
474  } else {
475  CnC::serializer * ser = this->new_serializer();
476  IType _val( i->second.val.combine( this->m_op ) );
477  (*ser) & DISTRED::VALUE & otag & _val;
478  const int to = my_parent_for_root( i->second.owner );
479  if( trace_level() > 2 ) {
480  Internal::Speaker oss(std::cout);
481  oss << this->name() << " send VALUE [";
482  cnc_format( oss, otag ) << "] ";
483  cnc_format( oss, _val ) << " to " << to;
484  }
485  this->send_msg( ser, to );
486  i->second.status = DONE;
487  }
488  }
489  }
490 
491  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
492 
493  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
494  int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast( CnC::serializer * ser, int root )
495  {
496  const int mpid = CnC::tuner_base::myPid();
497  const int nps = CnC::tuner_base::numProcs();
498  int _r1 = ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) + 1 ) * 2 - 1;
499  if( _r1 < nps ) {
500  if( _r1 < nps-1 ) {
501  // we have 2 children
502  _r1 = (root + _r1) % nps;
503  int _recvrs[2] = { _r1, (_r1+1)%nps };
504  this->bcast_msg( ser, _recvrs, 2 );
505  return 2;
506  } else {
507  // we have only a single child
508  _r1 = (root + _r1) % nps;
509  this->send_msg( ser, _r1 );
510  return 1;
511  }
512  }
513  delete ser;
514  // we are a leaf, nothing to be sent
515  return 0;
516  }
517 
518  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
519 
520  // home-grown bcast that uses a tree for each root
521  // at some point something like this should go into the CnC runtime
522  // returns the number of messages sent (0, 1 or 2)
523  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
524  int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast_count( const OTag & tag, const CType & val, const int root )
525  {
526  CnC::serializer * ser = this->new_serializer();
527  (*ser) & ( val != M1 ? DISTRED::BCASTCOUNT : DISTRED::DONE ) & tag & root;
528  if( val != M1 ) (*ser) & val;
529  int _c = bcast( ser, root );
530  if( _c && trace_level() > 2 ) {
531  Internal::Speaker oss(std::cout);
532  oss << this->name() << " bcast " << (val != M1 ? " BCASTCOUNT [" : " DONE [");
533  cnc_format( oss, tag ) << "]";
534  }
535  return _c;
536  };
537 
538  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
539 
540  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
541  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_all()
542  {
543  CNC_ASSERT( m_alldone );
544  if( trace_level() > 2 ) {
545  Internal::Speaker oss(std::cout);
546  oss << this->name() << " try_send_or_put_all " << m_nDones;
547  }
548  if( --m_nDones == 0 ) {
549  if( CnC::tuner_base::myPid() == 0 ) {
550  for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
551  m_out.put( i->first, i->second.val.combine( m_op ) );
552  }
553  } else {
554  int _n = m_reductions.size();
555  CnC::serializer * ser = this->new_serializer();
556  (*ser) & DISTRED::ALLVALUES & _n;
557  for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
558  IType _val( i->second.val.combine( this->m_op ) );
559  (*ser) & i->first & _val;
560  }
561  const int to = my_parent_for_root( 0 );
562  if( trace_level() > 2 ) {
563  Internal::Speaker oss(std::cout);
564  oss << this->name() << " send ALLVALUES to " << to;
565  }
566  this->send_msg( ser, to );
567  }
568  }
569  }
570 
571  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
572 
573  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
574  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::recv_msg( serializer * ser )
575  {
576  char _op;
577  (*ser) & _op;
578 
579  switch( _op ) {
580  case DISTRED::GATHERCOUNT : {
581  OTag _tag;
582  CType _cnt;
583  (*ser) & _tag & _cnt;
584  if( trace_level() > 2 ) {
585  Internal::Speaker oss(std::cout);
586  oss << this->name() << " recvd GATHERCOUNT [";
587  cnc_format( oss, _tag ) << "] " << _cnt;
588  }
589  m_oncount->on_gatherCount( _tag, get( _tag ), _cnt );
590  break;
591  }
592  case DISTRED::BCASTCOUNT : {
593  OTag _tag;
594  CType _cnt;
595  int _owner;
596  (*ser) & _tag & _owner & _cnt ;
597  if( trace_level() > 2 ) {
598  Internal::Speaker oss(std::cout);
599  oss << this->name() << " recvd BCASTCOUNT [";
600  cnc_format( oss, _tag ) << "] " << _cnt << " " << _owner;
601  }
602  m_oncount->on_bcastCount( _tag, get( _tag ), _cnt, _owner );
603  break;
604  }
605  case DISTRED::VALUE : {
606  OTag _tag;
607  IType _val;
608  (*ser) & _tag & _val;
609  if( trace_level() > 2 ) {
610  Internal::Speaker oss(std::cout);
611  oss << this->name() << " recvd VALUE [";
612  cnc_format( oss, _tag ) << "] ";
613  cnc_format( oss, _val );
614  }
615  m_ondata->on_value( _tag, get( _tag ), _val );
616  break;
617  }
618  case DISTRED::DONE : {
619  OTag _tag;
620  int _owner;
621  (*ser) & _tag & _owner;
622  if( trace_level() > 2 ) {
623  Internal::Speaker oss(std::cout);
624  oss << this->name() << " recvd DONE [";
625  cnc_format( oss, _tag ) << "] " << _owner;
626  }
627  m_oncount->on_done( _tag, get( _tag ), _owner );
628  break;
629  }
630  case DISTRED::ALLDONE : {
631  if( trace_level() > 2 ) {
632  Internal::Speaker oss(std::cout);
633  oss << this->name() << " recvd ALLDONE";
634  }
635  m_alldone = true;
636  CnC::serializer * ser = this->new_serializer();
637  (*ser) & DISTRED::ALLDONE;
638  m_nDones = 1000; // protect from current messages
639  m_nDones += bcast( ser, 0 );
640  m_nDones -= 999;
641  try_send_or_put_all();
642  break;
643  }
644  default:
645  case DISTRED::ALLVALUES : {
646  CNC_ASSERT( m_alldone = true );
647  int _n;
648  (*ser) & _n;
649  if( trace_level() > 2 ) {
650  Internal::Speaker oss(std::cout);
651  oss << this->name() << " recvd ALLVALUES " << _n;
652  }
653  while( _n-- ) {
654  OTag _tag;
655  IType _val;
656  (*ser) & _tag & _val;
657  const typename tls_map_type::iterator i = get( _tag );
658  m_ondata->add_value( i, _val );
659  }
660  try_send_or_put_all();
661  break;
662  }
663  CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
664  }
665  }
666 
667 #endif // _DIST_CNC_
668 
669  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
670  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
671 
672  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
673  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_item_put( reduce_type * r )
674  : m_reduce( r )
675  {}
676 
677  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
678 
679  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
680  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::add_value( const typename tls_map_type::iterator & i, const IType & val ) const
681  {
682  bool _exists;
683  IType & _rval = i->second.val.local( _exists );
684  _rval = m_reduce->m_op( _exists ? _rval : m_reduce->m_identity, val );
685  }
686 
687  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
688 
689 #ifdef _DIST_CNC_
690  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
691  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val )
692  {
693  TRACE( "::on_value" );
694  add_value( i, val );
695  m_reduce->try_send_or_put_value( otag, i );
696  }
697 #endif
698 
699  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
700 
701  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
702  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_put( const ITag & tag, const IType & val )
703  {
704  TRACE( "::on_data" );
705  OTag otag;
706  if( m_reduce->m_sel( tag, otag ) ) {
707  typename tls_map_type::iterator i = m_reduce->get( otag );
708  add_value( i, val );
709  CType _n = ++i->second.nreduced;
710  if( m_reduce->trace_level() > 0 ) {
711  Internal::Speaker oss(std::cout);
712  oss << m_reduce->name() << " on_put [";
713  cnc_format( oss, tag ) << "] for [";
714  cnc_format( oss, otag ) << "] ";
715  cnc_format( oss, val );
716  oss << " nred now " << _n << "/" << i->second.n
717 #ifdef _DIST_CNC_
718  << " owner " << i->second.owner << " status " << i->second.status << " nCounts " << i->second.nCounts
719 #endif
720  ;
721  }
722 #ifdef _DIST_CNC_
723  // not yet done, might need some communication
724  if( Internal::distributor::active() ) {
725  // just in case, test if all was local
726  // 2 cases
727  // 1.: local phase or we are the owner
728  // 2.: we know the red-count, but the reduction is not yet done.
729  if( i->second.status >= CNT_AVAILABLE ) {
730  if( CnC::tuner_base::myPid() == i->second.owner ) {
731  // the owner's put was the final put, let's trigger the done propagation
732  if( _n == i->second.n && i->second.nCounts <= 0 ) m_reduce->m_oncount->on_done( otag, i, CnC::tuner_base::myPid() );
733  } else {
734  // 2nd case
735  // We have to report the new count (not the value)
736  // we use an atomic variable to count, so we do not lose contributions
737  // we don't report the value, so nothing gets inconsistent
738  // the value is transmitted only when we know nothing more comes in (elsewhere)
739  m_reduce->send_count( otag, i, i->second.owner, false );
740  }
741  } // else means we are still collecting locally (case 1)
742  // }
743  } else
744 #endif
745  m_reduce->try_put_value( otag, i );
746  } else if( m_reduce->trace_level() > 0 ) {
747  Internal::Speaker oss(std::cout);
748  oss << m_reduce->name() << " [";
749  cnc_format( oss, tag ) << "] was not selected";
750  }
751  }
752 
753  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
754  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
755 
756  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
757  reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_count_put( reduce_type * r )
758  : m_reduce( r )
759  {}
760 
761  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
762 
763 #ifdef _DIST_CNC_
764  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
765  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner )
766  {
767  TRACE( "::on_done" );
768  if( m_reduce->trace_level() > 2 ) {
769  Internal::Speaker oss(std::cout);
770  oss << m_reduce->name() << " on_done for [";
771  cnc_format( oss, otag ) << "] nred now " << i->second.nreduced << "/" << i->second.n
772  << " owner " << i->second.owner << " status " << i->second.status;
773  }
774  i->second.owner = owner;
775  if( owner != CnC::tuner_base::myPid() || i->second.status.compare_and_swap( BCAST_DONE, CNT_AVAILABLE ) == CNT_AVAILABLE ) {
776  // "forward" through bcast-tree (we are using our home-grown bcast!)
777  i->second.nValues = 1000;
778  i->second.nValues += m_reduce->bcast_count( otag, M1, owner );
779  int _tmp = i->second.status.compare_and_swap( FINISH, BCAST_DONE );
780  CNC_ASSERT( owner != CnC::tuner_base::myPid() || _tmp == BCAST_DONE );
781  // we leave one more for ourself (no contribution to value)
782  i->second.nValues -= 999;
783  // we might be a leaf or all the values came back between the bcast and now
784  m_reduce->try_send_or_put_value( otag, i );
785  }
786  }
787 
788  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
789 
790  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
791  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner )
792  {
793  TRACE( "::on_bcast" );
794  CNC_ASSERT( cnt != M1 );
795  i->second.owner = owner;
796  i->second.n = cnt;
797 
798  int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
799  CNC_ASSERT( _tmp == LOCAL );
800  // "forward" through bcast-tree (we are using our home-grown bcast!)
801  i->second.nCounts = 1;
802  i->second.nCounts += m_reduce->bcast_count( otag, cnt, owner );
803  // if we are a leaf -> trigger gather to root
804  if( --i->second.nCounts == 0 && owner != CnC::tuner_base::myPid() ) {
805  m_reduce->send_count( otag, i, my_parent_for_root( owner ), true );
806  }
807  }
808 
809  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
810 
811  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
812  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt )
813  {
814  TRACE( "::on_gather" );
815  CNC_ASSERT( cnt != M1 );
816  i->second.nreduced += cnt;
817  if( m_reduce->trace_level() > 2 ) {
818  Internal::Speaker oss(std::cout);
819  oss << m_reduce->name() << " on_gatherCount [";
820  cnc_format( oss, otag ) << "] now " << i->second.nreduced << "/" << i->second.n << " nCounts " << i->second.nCounts;
821  }
822  // at root, we might need to trigger done phase
823  if( --i->second.nCounts <= 0 ) { // there might be extra counts
824  if( i->second.owner == CnC::tuner_base::myPid() ) {
825  if( i->second.n == i->second.nreduced ) {
826  on_done( otag, i, CnC::tuner_base::myPid() );
827  }
828  } else {
829  CNC_ASSERT( i->second.nCounts == 0 ); // extra counts occur only on root
830  m_reduce->send_count( otag, i, my_parent_for_root( i->second.owner ), true );
831  }
832  }
833  }
834 #endif // _DIST_CNC_
835 
836  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
837 
838  template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
839  void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_put( const OTag & otag, const CType & cnt )
840  {
841  TRACE( "::on_cnt" );
842  typename tls_map_type::iterator i = m_reduce->get( otag );
843 
844 #ifdef _DIST_CNC_
845  if( Internal::distributor::active() ) {
846  if( cnt != M1 ) { //is this a normal count?
847  i->second.n = cnt;
848  // just in case all was local, we try to finish
849  if( ! m_reduce->try_put_value( otag, i ) ) {
850  on_bcastCount( otag, i, cnt, CnC::tuner_base::myPid() );
851  }
852  } else { // this is the done flag
853  // we have no idea what was put remotely
854  // -> we trigger the final gather phase
855  int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
856  CNC_ASSERT( _tmp == LOCAL );
857  i->second.nCounts = 0;
858  on_done( otag, i, CnC::tuner_base::myPid() );
859  }
860  } else
861 #endif
862  {
863  if( cnt != M1 ) i->second.n = cnt;
864  else i->second.n = i->second.nreduced;
865  m_reduce->try_put_value( otag, i );
866  }
867  }
868 
869  // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
870 
871 } // namespace CnC
872 
873 /// @}
874 
875 #endif //_CnC_REDUCE_H_
CnC API.
Definition: cnc.h:49
Base class for defining and using CnC (sub-)graphs.
Definition: cnc.h:429
CnC context bringing together collections (for steps, items and tags).
Definition: cnc.h:54
graph * make_reduce_graph(CnC::context< Ctxt > &ctxt, const std::string &name, CnC::item_collection< ITag, IType, ITuner > &in, CnC::item_collection< OTag, CType, CTuner > &cnt, CnC::item_collection< OTag, IType, OTuner > &out, const ReduceOp &op, const IType &idty, const Select &sel)
Definition: reduce.h:96
static int numProcs()
Definition: default_tuner.h:93
Handles serilialization of data-objects.
Definition: serializer.h:348
An item collection is a mapping from tags to items.
Definition: cnc.h:57
static int myPid()
Definition: default_tuner.h:87