CnC
|
00001 /* ******************************************************************************* 00002 * Copyright (c) 2007-2014, Intel Corporation 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * 00007 * * Redistributions of source code must retain the above copyright notice, 00008 * this list of conditions and the following disclaimer. 00009 * * Redistributions in binary form must reproduce the above copyright 00010 * notice, this list of conditions and the following disclaimer in the 00011 * documentation and/or other materials provided with the distribution. 00012 * * Neither the name of Intel Corporation nor the names of its contributors 00013 * may be used to endorse or promote products derived from this software 00014 * without specific prior written permission. 00015 * 00016 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00017 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00018 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00019 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE 00020 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00021 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 00022 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00023 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 00024 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 00025 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00026 ********************************************************************************/ 00027 00028 /* 00029 Reductions for CnC, provided as a re-usable graph. 00030 */ 00031 00032 #ifndef _CnC_REDUCE_H_ 00033 #define _CnC_REDUCE_H_ 00034 00035 #include <cnc/internal/cnc_stddef.h> 00036 #include <cnc/internal/tbbcompat.h> 00037 #include <tbb/concurrent_unordered_map.h> 00038 #include <tbb/combinable.h> 00039 #include <tbb/atomic.h> 00040 #include <tbb/spin_rw_mutex.h> 00041 #include <functional> 00042 00043 namespace CnC 00044 { 00045 00046 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > class reduction; 00047 00048 /// \defgroup reductions Asynchronous Reductions 00049 /// @{ 00050 00051 /// Creates a graph for asynchronous reductions. 00052 /// 00053 /// Takes an input collection and reduces its content with a given 00054 /// operation and selection mechanism. The computation is done 00055 /// while new items arrive. Not all items need to be available to 00056 /// start or make progress. Data input is provided by normal puts 00057 /// into the input collection. The final reduced value for a 00058 /// reduction is put into the output collection. 00059 /// 00060 /// Supports multiple concurrent reductions (with the same 00061 /// operation) identified by a reduction id. For this, a selector 00062 /// functor can be provided to tell which data-item goes to which 00063 /// reduction (maps a data-tag to a reduction-id). 00064 /// 00065 /// The number reduced items per reduction-id needs to be provided 00066 /// through a second input collection. You can signal no more 00067 /// incoming values by putting a count < 0. Providing counts 00068 /// late reduces communication and potentially improves performance. 00069 /// 00070 /// Each reduction is independent of other reductions and can 00071 /// finish independently while others are still processing. 00072 /// Connected graphs can get the reduced values with a normal 00073 /// get-calls (using the desired reduction-id as the tag). 00074 /// 00075 /// The implementation is virtually lock-free. On distributed memory 00076 /// the additional communication is also largely asynchronous. 00077 /// 00078 /// See also \ref reuse 00079 /// 00080 /// \param ctxt the context to which the graph belongs 00081 /// \param name the name of this reduction graph instance 00082 /// \param in input collection, every item that's put here is 00083 /// applied to sel and potentially takes part in a reduction 00084 /// \param cnt input collection; number of items for each reduction 00085 /// expected to be put here (tag is reduction-id, value is count) 00086 /// \param out output collection, reduced results are put here with tags as returned by sel 00087 /// \param op the reduction operation:\n 00088 /// IType (*)(const IType&, const IType&) const\n 00089 /// usually a functor 00090 /// \param idty the identity/neutral element for the given operation 00091 /// \param sel functor, called once for every item put into "in":\n 00092 /// bool (*)( const ITag & itag, OTag & otag ) const\n 00093 /// must return true if given element should be used for a reduction, otherwise false;\n 00094 /// if true, it must set otag to the tag of the reduction it participates in 00095 template< typename Ctxt, typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00096 graph * make_reduce_graph( CnC::context< Ctxt > & ctxt, const std::string & name, 00097 CnC::item_collection< ITag, IType, ITuner > & in, 00098 CnC::item_collection< OTag, CType, CTuner > & cnt, 00099 CnC::item_collection< OTag, IType, OTuner > & out, 00100 const ReduceOp & op, 00101 const IType & idty, 00102 const Select & sel ) 00103 { 00104 return new reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >( ctxt, name, in, cnt, out, op, idty, sel ); 00105 } 00106 00107 00108 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00109 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00110 // The below is the implementation, normaly users shouldn't need to read it 00111 // However, you might want to use this a template for writring your own reduction. 00112 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00113 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00114 00115 00116 #ifdef _DIST_CNC_ 00117 // message tags for dist CnC 00118 namespace DISTRED { 00119 static const char BCASTCOUNT = 93; 00120 static const char GATHERCOUNT = 94; 00121 static const char DONE = 95; 00122 static const char ALLDONE = 96; 00123 static const char VALUE = 97; 00124 static const char ALLVALUES = 98; 00125 } 00126 #endif 00127 00128 // status of each reduction 00129 static const int LOCAL = 0; 00130 static const int CNT_AVAILABLE = 1; 00131 static const int BCAST_DONE = 2; 00132 static const int FINISH = 3; 00133 static const int DONE = 4; 00134 00135 // shortcut macro for ITAC instrumentation 00136 #ifdef CNC_WITH_ITAC 00137 # define TRACE( _m ) static std::string _t_n_( m_reduce->name() + _m ); VT_FUNC( _t_n_.c_str() ); 00138 #else 00139 # define TRACE( _m ) 00140 #endif 00141 00142 #define M1 static_cast< CType >( -1 ) 00143 00144 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00145 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00146 00147 // The actual reduction graph. A complex template construct; use 00148 // make_reduce_graph to create a reduction graph. Only the constructor is 00149 // public, everything else is private ("hidden"). 00150 // 00151 // We use tbb::combinable for the local (shard memory) reduction part. 00152 // 00153 // On distributed memory we implement the following asyncrhonous protocol 00154 // - As long as the count of a given reduction is unkown, we proceed as everything was local. 00155 // - As soon as the count arrives, we do the following 00156 // 0. assign ownership of the reduction to the count-providing process 00157 // The owner controls gathering the distributed values when the count 00158 // 1a. if count is a real count, it bcasts the count 00159 // 1a if it's a done-flag (-1), we immediately move to 3. 00160 // 2a. immediately followed by a gather of the counts (but not values) 00161 // 2b. processes which are not the owners send a message for every additional item that was not gathered in 2a 00162 // 3. once the onwer sees that all items for the reduction have arrived it bcast DONE 00163 // 4. immediately followed by a gather of the values 00164 // 5. when the owner collected all values, it use tbb::combinable and puts the final value 00165 // As almost everything can happen at the same time, we use transactional-like 00166 // operations implemented with atomic variables which guide each reduction through its states 00167 // We implement our own bcast/gather tree individually for each owner (as its root) 00168 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00169 class reduction : public CnC::graph//, CnC::Internal::no_copy 00170 { 00171 public: 00172 typedef CnC::item_collection< ITag, IType, ITuner > icoll_type; 00173 typedef CnC::item_collection< OTag, CType, CTuner > ccoll_type; 00174 typedef CnC::item_collection< OTag, IType, OTuner > ocoll_type; 00175 00176 template< typename Ctxt > 00177 reduction( CnC::context< Ctxt > & ctxt, const std::string & name, icoll_type & in, ccoll_type & c, ocoll_type & out, const ReduceOp & red, const IType & identity, const Select & sel ); 00178 ~reduction(); 00179 00180 // sometimes you can't tell number of reduced items until all computation is done. 00181 // This call finalizes all reductions, no matter if a count was given or not. 00182 void flush(); 00183 00184 // the implementation is "hidden" 00185 private: 00186 typedef reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select > reduce_type; 00187 typedef tbb::spin_rw_mutex mutex_type; 00188 00189 // thread-local storage per reduction 00190 struct red_tls { 00191 tbb::combinable< IType > val; 00192 tbb::atomic< CType > nreduced; 00193 tbb::atomic< CType > n; 00194 mutex_type mtx; 00195 #ifdef _DIST_CNC_ 00196 tbb::atomic< int > nCounts; 00197 tbb::atomic< int > nValues; 00198 int owner; 00199 00200 #endif 00201 tbb::atomic< int > status; 00202 red_tls(); 00203 red_tls( const IType & v ); 00204 red_tls( const red_tls & rt ); 00205 private: 00206 void operator=( const red_tls & rt ); 00207 }; 00208 typedef tbb::concurrent_unordered_map< OTag, red_tls > tls_map_type; 00209 00210 // callback for collection a 00211 struct on_item_put : public icoll_type::callback_type 00212 { 00213 on_item_put( reduce_type * r ); 00214 void on_put( const ITag & tag, const IType & val ); 00215 #ifdef _DIST_CNC_ 00216 void on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val ); 00217 #endif 00218 void add_value( const typename tls_map_type::iterator & i, const IType & val ) const; 00219 private: 00220 00221 reduce_type * m_reduce; 00222 }; 00223 friend struct on_item_put; 00224 00225 // callback for count collection 00226 struct on_count_put : public ccoll_type::callback_type 00227 { 00228 on_count_put( reduce_type * r ); 00229 void on_put( const OTag & otag, const CType & cnt ); 00230 #ifdef _DIST_CNC_ 00231 void on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner ); 00232 void on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner ); 00233 void on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt ); 00234 #endif 00235 private: 00236 reduce_type * m_reduce; 00237 }; 00238 friend struct on_count_put; 00239 00240 typename tls_map_type::iterator get( const OTag & tag ); 00241 bool try_put_value( const OTag & otag, const typename tls_map_type::iterator & i ); 00242 #ifdef _DIST_CNC_ 00243 bool send_count( const OTag & otag, const typename tls_map_type::iterator & i, const int to, const bool always ); 00244 static int my_parent_for_root( const int root ); 00245 void try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i ); 00246 void try_send_or_put_all(); 00247 // home-grown bcast that uses a tree for each root 00248 // at some point something like this should go into the CnC runtime 00249 // returns the number of messages sent (0, 1 or 2) 00250 int bcast( CnC::serializer * ser, int root ); 00251 int bcast_count( const OTag & tag, const CType & val, const int root ); 00252 virtual void recv_msg( serializer * ser ); 00253 #endif 00254 icoll_type & m_in; 00255 ccoll_type & m_cnt; 00256 ocoll_type & m_out; 00257 on_item_put * m_ondata; 00258 on_count_put * m_oncount; 00259 ReduceOp m_op; 00260 Select m_sel; 00261 tls_map_type m_reductions; 00262 const IType m_identity; 00263 #ifdef _DIST_CNC_ 00264 tbb::atomic< int > m_nDones; 00265 bool m_alldone; 00266 #endif 00267 }; 00268 00269 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00270 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00271 00272 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00273 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls() 00274 { 00275 nreduced = 0; 00276 n = M1; 00277 status = LOCAL; 00278 #ifdef _DIST_CNC_ 00279 nCounts = -1; 00280 nValues = -1; 00281 owner = -1; 00282 #endif 00283 } 00284 00285 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00286 00287 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00288 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const IType & v ) 00289 { 00290 val = v; 00291 nreduced = 0; 00292 n = M1; 00293 status = LOCAL; 00294 #ifdef _DIST_CNC_ 00295 nCounts = -1; 00296 nValues = -1; 00297 owner = -1; 00298 #endif 00299 } 00300 00301 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00302 00303 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00304 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const red_tls & rt ) 00305 : val( rt.val ), 00306 nreduced( rt.nreduced ), 00307 n( rt.n ), 00308 mtx(), 00309 #ifdef _DIST_CNC_ 00310 nCounts( rt.nCounts ), 00311 nValues( rt.nValues ), 00312 owner( rt.owner ), 00313 #endif 00314 status( rt.status ) 00315 {} 00316 00317 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00318 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00319 00320 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00321 template< typename Ctxt > 00322 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::reduction( CnC::context< Ctxt > & ctxt, const std::string & name, 00323 icoll_type & in, ccoll_type & c, ocoll_type & out, 00324 const ReduceOp & red, const IType & identity, const Select & sel ) 00325 : CnC::graph( ctxt, name ), 00326 m_in( in ), 00327 m_cnt( c ), 00328 m_out( out ), 00329 m_ondata( new on_item_put( this ) ), 00330 m_oncount( new on_count_put( this ) ), 00331 m_op( red ), 00332 m_sel( sel ), 00333 m_reductions(), 00334 m_identity( identity ) 00335 { 00336 // callback objects must persist the lifetime of collections 00337 m_in.on_put( m_ondata ); 00338 m_cnt.on_put( m_oncount ); 00339 #ifdef _DIST_CNC_ 00340 m_alldone = false; 00341 m_nDones = -1; 00342 #endif 00343 } 00344 00345 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00346 00347 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00348 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::~reduction() 00349 { 00350 // delete m_ondata; 00351 // delete m_oncount; 00352 } 00353 00354 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00355 00356 // sometimes you can't tell number of reduced items until all computation is done. 00357 // This call finalizes all reductions, no matter if a count was given or not. 00358 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00359 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::flush() 00360 { 00361 #ifdef _DIST_CNC_ 00362 if( Internal::distributor::active() ) { 00363 if( trace_level() > 0 ) { 00364 Internal::Speaker oss(std::cout); 00365 oss << this->name() << " flush: bcast ALLDONE"; 00366 } 00367 CNC_ASSERT( CnC::tuner_base::myPid() == 0 ); 00368 m_alldone = true; 00369 CnC::serializer * ser = this->new_serializer(); 00370 (*ser) & DISTRED::ALLDONE; 00371 m_nDones = 1000; // protect from current messages 00372 m_nDones += bcast( ser, 0 ); 00373 m_nDones -= 999; 00374 try_send_or_put_all(); 00375 } else 00376 #endif 00377 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) { 00378 m_out.put( i->first, i->second.val.combine( m_op ) ); 00379 } 00380 } 00381 00382 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00383 typename reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::tls_map_type::iterator 00384 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::get( const OTag & tag ) 00385 { 00386 typename tls_map_type::iterator i = m_reductions.find( tag ); 00387 if( i == m_reductions.end() ) { 00388 i = m_reductions.insert( typename tls_map_type::value_type( tag, red_tls() ) ).first; 00389 } 00390 return i; 00391 } 00392 00393 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00394 00395 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00396 bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_put_value( const OTag & otag, const typename tls_map_type::iterator & i ) 00397 { 00398 if( trace_level() > 2 ) { 00399 Internal::Speaker oss(std::cout); 00400 oss << this->name() << " try_put_value ["; 00401 cnc_format( oss, otag ) << "]" 00402 #ifdef _DIST_CNC_ 00403 << " nValues " << i->second.nValues << " status " << i->second.status 00404 #endif 00405 ; 00406 } 00407 if( i->second.nreduced == i->second.n ) { 00408 // setting n could go in parallel, it sets new n and then compares 00409 mutex_type::scoped_lock _lock( i->second.mtx ); 00410 if( i->second.status != DONE ) { 00411 this->m_out.put( otag, i->second.val.combine( this->m_op ) ); 00412 i->second.status = DONE; 00413 return true; 00414 } 00415 } 00416 return false; 00417 } 00418 00419 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00420 00421 #ifdef _DIST_CNC_ 00422 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00423 bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::send_count( const OTag & otag, const typename tls_map_type::iterator & i, 00424 const int to, const bool always ) 00425 { 00426 // we might have a count-put and the last value-put concurrently and all local 00427 // only then the owner/to can be <0 or myself 00428 if( to < 0 || to == CnC::tuner_base::myPid() ) return false; 00429 CType _cnt = i->second.nreduced.fetch_and_store( 0 ); 00430 if( always || _cnt > 0 ) { // someone else might have transmitted the combined count already 00431 CnC::serializer * ser = this->new_serializer(); 00432 (*ser) & DISTRED::GATHERCOUNT & otag & _cnt; 00433 if( trace_level() > 2 ) { 00434 Internal::Speaker oss(std::cout); 00435 oss << this->name() << " send GATHERCOUNT ["; 00436 cnc_format( oss, otag ) << "] " << _cnt << " to " << to; 00437 } 00438 this->send_msg( ser, to ); 00439 return true; 00440 } 00441 return false; 00442 } 00443 00444 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00445 00446 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00447 int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::my_parent_for_root( const int root ) 00448 { 00449 const int mpid = CnC::tuner_base::myPid(); 00450 const int nps = CnC::tuner_base::numProcs(); 00451 CNC_ASSERT( root != mpid ); 00452 int _p = ( ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) - 1 ) / 2 ) + root; 00453 return _p % nps; 00454 }; 00455 00456 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00457 00458 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00459 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i ) 00460 { 00461 if( trace_level() > 2 ) { 00462 Internal::Speaker oss(std::cout); 00463 oss << this->name() << " try_send_or_put_value ["; 00464 cnc_format( oss, otag ) << "] nValues " << i->second.nValues << " status " << i->second.status; 00465 } 00466 if( i->second.nValues.fetch_and_decrement() == 1 ) { 00467 if( i->second.owner == CnC::tuner_base::myPid() ) { 00468 if( i->second.status == FINISH ) { 00469 CNC_ASSERT( i->second.nreduced == i->second.n || i->second.n == M1 ); 00470 CNC_ASSERT( i->second.nValues == 0 && i->second.status == FINISH ); 00471 i->second.nreduced = i->second.n; 00472 try_put_value( otag, i ); 00473 } 00474 } else { 00475 CnC::serializer * ser = this->new_serializer(); 00476 IType _val( i->second.val.combine( this->m_op ) ); 00477 (*ser) & DISTRED::VALUE & otag & _val; 00478 const int to = my_parent_for_root( i->second.owner ); 00479 if( trace_level() > 2 ) { 00480 Internal::Speaker oss(std::cout); 00481 oss << this->name() << " send VALUE ["; 00482 cnc_format( oss, otag ) << "] "; 00483 cnc_format( oss, _val ) << " to " << to; 00484 } 00485 this->send_msg( ser, to ); 00486 i->second.status = DONE; 00487 } 00488 } 00489 } 00490 00491 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00492 00493 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00494 int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast( CnC::serializer * ser, int root ) 00495 { 00496 const int mpid = CnC::tuner_base::myPid(); 00497 const int nps = CnC::tuner_base::numProcs(); 00498 int _r1 = ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) + 1 ) * 2 - 1; 00499 if( _r1 < nps ) { 00500 if( _r1 < nps-1 ) { 00501 // we have 2 children 00502 _r1 = (root + _r1) % nps; 00503 int _recvrs[2] = { _r1, (_r1+1)%nps }; 00504 this->bcast_msg( ser, _recvrs, 2 ); 00505 return 2; 00506 } else { 00507 // we have only a single child 00508 _r1 = (root + _r1) % nps; 00509 this->send_msg( ser, _r1 ); 00510 return 1; 00511 } 00512 } 00513 delete ser; 00514 // we are a leaf, nothing to be sent 00515 return 0; 00516 } 00517 00518 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00519 00520 // home-grown bcast that uses a tree for each root 00521 // at some point something like this should go into the CnC runtime 00522 // returns the number of messages sent (0, 1 or 2) 00523 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00524 int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast_count( const OTag & tag, const CType & val, const int root ) 00525 { 00526 CnC::serializer * ser = this->new_serializer(); 00527 (*ser) & ( val != M1 ? DISTRED::BCASTCOUNT : DISTRED::DONE ) & tag & root; 00528 if( val != M1 ) (*ser) & val; 00529 int _c = bcast( ser, root ); 00530 if( _c && trace_level() > 2 ) { 00531 Internal::Speaker oss(std::cout); 00532 oss << this->name() << " bcast " << (val != M1 ? " BCASTCOUNT [" : " DONE ["); 00533 cnc_format( oss, tag ) << "]"; 00534 } 00535 return _c; 00536 }; 00537 00538 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00539 00540 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00541 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_all() 00542 { 00543 CNC_ASSERT( m_alldone ); 00544 if( trace_level() > 2 ) { 00545 Internal::Speaker oss(std::cout); 00546 oss << this->name() << " try_send_or_put_all " << m_nDones; 00547 } 00548 if( --m_nDones == 0 ) { 00549 if( CnC::tuner_base::myPid() == 0 ) { 00550 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) { 00551 m_out.put( i->first, i->second.val.combine( m_op ) ); 00552 } 00553 } else { 00554 int _n = m_reductions.size(); 00555 CnC::serializer * ser = this->new_serializer(); 00556 (*ser) & DISTRED::ALLVALUES & _n; 00557 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) { 00558 IType _val( i->second.val.combine( this->m_op ) ); 00559 (*ser) & i->first & _val; 00560 } 00561 const int to = my_parent_for_root( 0 ); 00562 if( trace_level() > 2 ) { 00563 Internal::Speaker oss(std::cout); 00564 oss << this->name() << " send ALLVALUES to " << to; 00565 } 00566 this->send_msg( ser, to ); 00567 } 00568 } 00569 } 00570 00571 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00572 00573 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00574 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::recv_msg( serializer * ser ) 00575 { 00576 char _op; 00577 (*ser) & _op; 00578 00579 switch( _op ) { 00580 case DISTRED::GATHERCOUNT : { 00581 OTag _tag; 00582 CType _cnt; 00583 (*ser) & _tag & _cnt; 00584 if( trace_level() > 2 ) { 00585 Internal::Speaker oss(std::cout); 00586 oss << this->name() << " recvd GATHERCOUNT ["; 00587 cnc_format( oss, _tag ) << "] " << _cnt; 00588 } 00589 m_oncount->on_gatherCount( _tag, get( _tag ), _cnt ); 00590 break; 00591 } 00592 case DISTRED::BCASTCOUNT : { 00593 OTag _tag; 00594 CType _cnt; 00595 int _owner; 00596 (*ser) & _tag & _owner & _cnt ; 00597 if( trace_level() > 2 ) { 00598 Internal::Speaker oss(std::cout); 00599 oss << this->name() << " recvd BCASTCOUNT ["; 00600 cnc_format( oss, _tag ) << "] " << _cnt << " " << _owner; 00601 } 00602 m_oncount->on_bcastCount( _tag, get( _tag ), _cnt, _owner ); 00603 break; 00604 } 00605 case DISTRED::VALUE : { 00606 OTag _tag; 00607 IType _val; 00608 (*ser) & _tag & _val; 00609 if( trace_level() > 2 ) { 00610 Internal::Speaker oss(std::cout); 00611 oss << this->name() << " recvd VALUE ["; 00612 cnc_format( oss, _tag ) << "] "; 00613 cnc_format( oss, _val ); 00614 } 00615 m_ondata->on_value( _tag, get( _tag ), _val ); 00616 break; 00617 } 00618 case DISTRED::DONE : { 00619 OTag _tag; 00620 int _owner; 00621 (*ser) & _tag & _owner; 00622 if( trace_level() > 2 ) { 00623 Internal::Speaker oss(std::cout); 00624 oss << this->name() << " recvd DONE ["; 00625 cnc_format( oss, _tag ) << "] " << _owner; 00626 } 00627 m_oncount->on_done( _tag, get( _tag ), _owner ); 00628 break; 00629 } 00630 case DISTRED::ALLDONE : { 00631 if( trace_level() > 2 ) { 00632 Internal::Speaker oss(std::cout); 00633 oss << this->name() << " recvd ALLDONE"; 00634 } 00635 m_alldone = true; 00636 CnC::serializer * ser = this->new_serializer(); 00637 (*ser) & DISTRED::ALLDONE; 00638 m_nDones = 1000; // protect from current messages 00639 m_nDones += bcast( ser, 0 ); 00640 m_nDones -= 999; 00641 try_send_or_put_all(); 00642 break; 00643 } 00644 default: 00645 case DISTRED::ALLVALUES : { 00646 CNC_ASSERT( m_alldone = true ); 00647 int _n; 00648 (*ser) & _n; 00649 if( trace_level() > 2 ) { 00650 Internal::Speaker oss(std::cout); 00651 oss << this->name() << " recvd ALLVALUES " << _n; 00652 } 00653 while( _n-- ) { 00654 OTag _tag; 00655 IType _val; 00656 (*ser) & _tag & _val; 00657 const typename tls_map_type::iterator i = get( _tag ); 00658 m_ondata->add_value( i, _val ); 00659 } 00660 try_send_or_put_all(); 00661 break; 00662 } 00663 CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" ); 00664 } 00665 } 00666 00667 #endif // _DIST_CNC_ 00668 00669 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00670 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00671 00672 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00673 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_item_put( reduce_type * r ) 00674 : m_reduce( r ) 00675 {} 00676 00677 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00678 00679 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00680 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::add_value( const typename tls_map_type::iterator & i, const IType & val ) const 00681 { 00682 bool _exists; 00683 IType & _rval = i->second.val.local( _exists ); 00684 _rval = m_reduce->m_op( _exists ? _rval : m_reduce->m_identity, val ); 00685 } 00686 00687 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00688 00689 #ifdef _DIST_CNC_ 00690 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00691 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val ) 00692 { 00693 TRACE( "::on_value" ); 00694 add_value( i, val ); 00695 m_reduce->try_send_or_put_value( otag, i ); 00696 } 00697 #endif 00698 00699 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00700 00701 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00702 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_put( const ITag & tag, const IType & val ) 00703 { 00704 TRACE( "::on_data" ); 00705 OTag otag; 00706 if( m_reduce->m_sel( tag, otag ) ) { 00707 typename tls_map_type::iterator i = m_reduce->get( otag ); 00708 add_value( i, val ); 00709 CType _n = ++i->second.nreduced; 00710 if( m_reduce->trace_level() > 0 ) { 00711 Internal::Speaker oss(std::cout); 00712 oss << m_reduce->name() << " on_put ["; 00713 cnc_format( oss, tag ) << "] for ["; 00714 cnc_format( oss, otag ) << "] "; 00715 cnc_format( oss, val ); 00716 oss << " nred now " << _n << "/" << i->second.n 00717 #ifdef _DIST_CNC_ 00718 << " owner " << i->second.owner << " status " << i->second.status << " nCounts " << i->second.nCounts 00719 #endif 00720 ; 00721 } 00722 #ifdef _DIST_CNC_ 00723 // not yet done, might need some communication 00724 if( Internal::distributor::active() ) { 00725 // just in case, test if all was local 00726 // 2 cases 00727 // 1.: local phase or we are the owner 00728 // 2.: we know the red-count, but the reduction is not yet done. 00729 if( i->second.status >= CNT_AVAILABLE ) { 00730 if( CnC::tuner_base::myPid() == i->second.owner ) { 00731 // the owner's put was the final put, let's trigger the done propagation 00732 if( _n == i->second.n && i->second.nCounts <= 0 ) m_reduce->m_oncount->on_done( otag, i, CnC::tuner_base::myPid() ); 00733 } else { 00734 // 2nd case 00735 // We have to report the new count (not the value) 00736 // we use an atomic variable to count, so we do not lose contributions 00737 // we don't report the value, so nothing gets inconsistent 00738 // the value is transmitted only when we know nothing more comes in (elsewhere) 00739 m_reduce->send_count( otag, i, i->second.owner, false ); 00740 } 00741 } // else means we are still collecting locally (case 1) 00742 // } 00743 } else 00744 #endif 00745 m_reduce->try_put_value( otag, i ); 00746 } else if( m_reduce->trace_level() > 0 ) { 00747 Internal::Speaker oss(std::cout); 00748 oss << m_reduce->name() << " ["; 00749 cnc_format( oss, tag ) << "] was not selected"; 00750 } 00751 } 00752 00753 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00754 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00755 00756 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00757 reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_count_put( reduce_type * r ) 00758 : m_reduce( r ) 00759 {} 00760 00761 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00762 00763 #ifdef _DIST_CNC_ 00764 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00765 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner ) 00766 { 00767 TRACE( "::on_done" ); 00768 if( m_reduce->trace_level() > 2 ) { 00769 Internal::Speaker oss(std::cout); 00770 oss << m_reduce->name() << " on_done for ["; 00771 cnc_format( oss, otag ) << "] nred now " << i->second.nreduced << "/" << i->second.n 00772 << " owner " << i->second.owner << " status " << i->second.status; 00773 } 00774 i->second.owner = owner; 00775 if( owner != CnC::tuner_base::myPid() || i->second.status.compare_and_swap( BCAST_DONE, CNT_AVAILABLE ) == CNT_AVAILABLE ) { 00776 // "forward" through bcast-tree (we are using our home-grown bcast!) 00777 i->second.nValues = 1000; 00778 i->second.nValues += m_reduce->bcast_count( otag, M1, owner ); 00779 int _tmp = i->second.status.compare_and_swap( FINISH, BCAST_DONE ); 00780 CNC_ASSERT( owner != CnC::tuner_base::myPid() || _tmp == BCAST_DONE ); 00781 // we leave one more for ourself (no contribution to value) 00782 i->second.nValues -= 999; 00783 // we might be a leaf or all the values came back between the bcast and now 00784 m_reduce->try_send_or_put_value( otag, i ); 00785 } 00786 } 00787 00788 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00789 00790 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00791 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner ) 00792 { 00793 TRACE( "::on_bcast" ); 00794 CNC_ASSERT( cnt != M1 ); 00795 i->second.owner = owner; 00796 i->second.n = cnt; 00797 00798 int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL ); 00799 CNC_ASSERT( _tmp == LOCAL ); 00800 // "forward" through bcast-tree (we are using our home-grown bcast!) 00801 i->second.nCounts = 1; 00802 i->second.nCounts += m_reduce->bcast_count( otag, cnt, owner ); 00803 // if we are a leaf -> trigger gather to root 00804 if( --i->second.nCounts == 0 && owner != CnC::tuner_base::myPid() ) { 00805 m_reduce->send_count( otag, i, my_parent_for_root( owner ), true ); 00806 } 00807 } 00808 00809 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00810 00811 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00812 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt ) 00813 { 00814 TRACE( "::on_gather" ); 00815 CNC_ASSERT( cnt != M1 ); 00816 i->second.nreduced += cnt; 00817 if( m_reduce->trace_level() > 2 ) { 00818 Internal::Speaker oss(std::cout); 00819 oss << m_reduce->name() << " on_gatherCount ["; 00820 cnc_format( oss, otag ) << "] now " << i->second.nreduced << "/" << i->second.n << " nCounts " << i->second.nCounts; 00821 } 00822 // at root, we might need to trigger done phase 00823 if( --i->second.nCounts <= 0 ) { // there might be extra counts 00824 if( i->second.owner == CnC::tuner_base::myPid() ) { 00825 if( i->second.n == i->second.nreduced ) { 00826 on_done( otag, i, CnC::tuner_base::myPid() ); 00827 } 00828 } else { 00829 CNC_ASSERT( i->second.nCounts == 0 ); // extra counts occur only on root 00830 m_reduce->send_count( otag, i, my_parent_for_root( i->second.owner ), true ); 00831 } 00832 } 00833 } 00834 #endif // _DIST_CNC_ 00835 00836 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00837 00838 template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > 00839 void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_put( const OTag & otag, const CType & cnt ) 00840 { 00841 TRACE( "::on_cnt" ); 00842 typename tls_map_type::iterator i = m_reduce->get( otag ); 00843 00844 #ifdef _DIST_CNC_ 00845 if( Internal::distributor::active() ) { 00846 if( cnt != M1 ) { //is this a normal count? 00847 i->second.n = cnt; 00848 // just in case all was local, we try to finish 00849 if( ! m_reduce->try_put_value( otag, i ) ) { 00850 on_bcastCount( otag, i, cnt, CnC::tuner_base::myPid() ); 00851 } 00852 } else { // this is the done flag 00853 // we have no idea what was put remotely 00854 // -> we trigger the final gather phase 00855 int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL ); 00856 CNC_ASSERT( _tmp == LOCAL ); 00857 i->second.nCounts = 0; 00858 on_done( otag, i, CnC::tuner_base::myPid() ); 00859 } 00860 } else 00861 #endif 00862 { 00863 if( cnt != M1 ) i->second.n = cnt; 00864 else i->second.n = i->second.nreduced; 00865 m_reduce->try_put_value( otag, i ); 00866 } 00867 } 00868 00869 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00870 00871 } // namespace CnC 00872 00873 /// @} 00874 00875 #endif //_CnC_REDUCE_H_