CnC
reduce.h
00001 /* *******************************************************************************
00002  *  Copyright (c) 2007-2014, Intel Corporation
00003  *
00004  *  Redistribution and use in source and binary forms, with or without
00005  *  modification, are permitted provided that the following conditions are met:
00006  *
00007  *  * Redistributions of source code must retain the above copyright notice,
00008  *    this list of conditions and the following disclaimer.
00009  *  * Redistributions in binary form must reproduce the above copyright
00010  *    notice, this list of conditions and the following disclaimer in the
00011  *    documentation and/or other materials provided with the distribution.
00012  *  * Neither the name of Intel Corporation nor the names of its contributors
00013  *    may be used to endorse or promote products derived from this software
00014  *    without specific prior written permission.
00015  *
00016  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00019  *  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
00020  *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00021  *  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00022  *  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00023  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00024  *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00025  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  ********************************************************************************/
00027 
00028 /*
00029   Reductions for CnC, provided as a re-usable graph.
00030 */
00031 
00032 #ifndef _CnC_REDUCE_H_
00033 #define _CnC_REDUCE_H_
00034 
00035 #include <cnc/internal/cnc_stddef.h>
00036 #include <cnc/internal/tbbcompat.h>
00037 #include <tbb/concurrent_unordered_map.h>
00038 #include <tbb/combinable.h>
00039 #include <tbb/atomic.h>
00040 #include <tbb/spin_rw_mutex.h>
00041 #include <functional>
00042 
00043 namespace CnC
00044 {
00045 
00046     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select > class reduction;
00047 
00048     /// \defgroup reductions Asynchronous Reductions
00049     /// @{
00050 
00051     /// Creates a graph for asynchronous reductions.
00052     ///
00053     /// Takes an input collection and reduces its content with a given
00054     /// operation and selection mechanism.  The computation is done
00055     /// while new items arrive. Not all items need to be available to
00056     /// start or make progress.  Data input is provided by normal puts
00057     /// into the input collection. The final reduced value for a
00058     /// reduction is put into the output collection.
00059     ///
00060     /// Supports multiple concurrent reductions (with the same
00061     /// operation) identified by a reduction id.  For this, a selector
00062     /// functor can be provided to tell which data-item goes to which
00063     /// reduction (maps a data-tag to a reduction-id).
00064     ///
00065     /// The number reduced items per reduction-id needs to be provided
00066     /// through a second input collection. You can signal no more
00067     /// incoming values by putting a count < 0. Providing counts
00068     /// late reduces communication and potentially improves performance.
00069     ///
00070     /// Each reduction is independent of other reductions and can
00071     /// finish independently while others are still processing.
00072     /// Connected graphs can get the reduced values with a normal
00073     /// get-calls (using the desired reduction-id as the tag).
00074     ///
00075     /// The implementation is virtually lock-free. On distributed memory
00076     /// the additional communication is also largely asynchronous.
00077     ///
00078     /// See also \ref reuse
00079     ///
00080     /// \param ctxt the context to which the graph belongs
00081     /// \param name the name of this reduction graph instance
00082     /// \param in   input collection, every item that's put here is
00083     ///             applied to sel and potentially takes part in a reduction
00084     /// \param cnt  input collection; number of items for each reduction
00085     ///             expected to be put here (tag is reduction-id, value is count)
00086     /// \param out  output collection, reduced results are put here with tags as returned by sel
00087     /// \param op   the reduction operation:\n
00088     ///             IType (*)(const IType&, const IType&) const\n
00089     ///             usually a functor
00090     /// \param idty the identity/neutral element for the given operation
00091     /// \param sel  functor, called once for every item put into "in":\n
00092     ///             bool (*)( const ITag & itag, OTag & otag ) const\n
00093     ///             must return true if given element should be used for a reduction, otherwise false;\n
00094     ///             if true, it must set otag to the tag of the reduction it participates in
00095     template< typename Ctxt, typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00096     graph * make_reduce_graph( CnC::context< Ctxt > & ctxt, const std::string & name, 
00097                                CnC::item_collection< ITag, IType, ITuner > & in,
00098                                CnC::item_collection< OTag, CType, CTuner > & cnt,
00099                                CnC::item_collection< OTag, IType, OTuner > & out,
00100                                const ReduceOp & op,
00101                                const IType & idty,
00102                                const Select & sel )
00103     {
00104         return new reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >( ctxt, name, in, cnt, out, op, idty, sel );
00105     }
00106 
00107 
00108     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00109     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00110     // The below is the implementation, normaly users shouldn't need to read it
00111     // However, you might want to use this a template for writring your own reduction.
00112     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00113     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00114 
00115     
00116 #ifdef _DIST_CNC_
00117     // message tags for dist CnC
00118     namespace DISTRED {
00119         static const char BCASTCOUNT  = 93;
00120         static const char GATHERCOUNT = 94;
00121         static const char DONE        = 95;
00122         static const char ALLDONE     = 96;
00123         static const char VALUE       = 97;
00124         static const char ALLVALUES   = 98;
00125     }
00126 #endif
00127 
00128     // status of each reduction
00129     static const int LOCAL         = 0;
00130     static const int CNT_AVAILABLE = 1;
00131     static const int BCAST_DONE    = 2;
00132     static const int FINISH        = 3;
00133     static const int DONE          = 4;
00134 
00135     // shortcut macro for ITAC instrumentation
00136 #ifdef CNC_WITH_ITAC
00137 # define TRACE( _m ) static std::string _t_n_( m_reduce->name() + _m ); VT_FUNC( _t_n_.c_str() );
00138 #else
00139 # define TRACE( _m )
00140 #endif
00141 
00142 #define M1 static_cast< CType >( -1 )
00143 
00144     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00145     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00146 
00147     // The actual reduction graph. A complex template construct; use
00148     // make_reduce_graph to create a reduction graph. Only the constructor is
00149     // public, everything else is private ("hidden").
00150     //
00151     // We use tbb::combinable for the local (shard memory) reduction part.
00152     //
00153     // On distributed memory we implement the following asyncrhonous protocol
00154     //   - As long as the count of a given reduction is unkown, we proceed as everything was local.
00155     //   - As soon as the count arrives, we do the following
00156     //     0. assign ownership of the reduction to the count-providing process
00157     //        The owner controls gathering the distributed values when the count
00158     //     1a. if count is a real count, it bcasts the count
00159     //     1a if it's a done-flag (-1), we immediately move to 3.
00160     //     2a. immediately followed by a gather of the counts (but not values)
00161     //     2b. processes which are not the owners send a message for every additional item that was not gathered in 2a
00162     //     3. once the onwer sees that all items for the reduction have arrived it bcast DONE
00163     //     4. immediately followed by a gather of the values
00164     //     5. when the owner collected all values, it use tbb::combinable and puts the final value
00165     // As almost everything can happen at the same time, we use transactional-like 
00166     // operations implemented with atomic variables which guide each reduction through its states
00167     // We implement our own bcast/gather tree individually for each owner (as its root)
00168     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00169     class reduction : public CnC::graph//, CnC::Internal::no_copy
00170     {
00171     public:
00172         typedef CnC::item_collection< ITag, IType, ITuner > icoll_type;
00173         typedef CnC::item_collection< OTag, CType, CTuner > ccoll_type;
00174         typedef CnC::item_collection< OTag, IType, OTuner > ocoll_type;
00175 
00176         template< typename Ctxt >
00177         reduction( CnC::context< Ctxt > & ctxt, const std::string & name, icoll_type & in, ccoll_type & c, ocoll_type & out, const ReduceOp & red, const IType & identity, const Select & sel );
00178         ~reduction();
00179         
00180         // sometimes you can't tell number of reduced items until all computation is done.
00181         // This call finalizes all reductions, no matter if a count was given or not.
00182         void flush();
00183         
00184         // the implementation is "hidden"
00185     private:
00186         typedef reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select > reduce_type;
00187         typedef tbb::spin_rw_mutex mutex_type;
00188 
00189         // thread-local storage per reduction
00190         struct red_tls {
00191             tbb::combinable< IType > val;
00192             tbb::atomic< CType > nreduced;
00193             tbb::atomic< CType > n;
00194             mutex_type mtx;
00195 #ifdef _DIST_CNC_
00196             tbb::atomic< int > nCounts;
00197             tbb::atomic< int > nValues;
00198             int owner;
00199             
00200 #endif
00201             tbb::atomic< int > status;
00202             red_tls();
00203             red_tls( const IType & v );
00204             red_tls( const red_tls & rt );
00205         private:
00206             void operator=( const red_tls & rt );
00207         };
00208         typedef tbb::concurrent_unordered_map< OTag, red_tls > tls_map_type;
00209 
00210         // callback for collection a
00211         struct on_item_put : public icoll_type::callback_type
00212         {
00213             on_item_put( reduce_type * r );
00214             void on_put( const ITag & tag, const IType & val );
00215 #ifdef _DIST_CNC_
00216             void on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val );
00217 #endif
00218             void add_value( const typename tls_map_type::iterator & i, const IType & val ) const;
00219         private:
00220 
00221             reduce_type * m_reduce;
00222         };
00223         friend struct on_item_put;
00224 
00225         // callback for count collection
00226         struct on_count_put : public ccoll_type::callback_type
00227         {
00228             on_count_put( reduce_type * r );
00229             void on_put( const OTag & otag, const CType & cnt );
00230 #ifdef _DIST_CNC_
00231             void on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner );
00232             void on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner );
00233             void on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt );
00234 #endif
00235         private:
00236             reduce_type * m_reduce;
00237         };
00238         friend struct on_count_put;
00239 
00240         typename tls_map_type::iterator get( const OTag & tag );
00241         bool try_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
00242 #ifdef _DIST_CNC_
00243         bool send_count( const OTag & otag, const typename tls_map_type::iterator & i, const int to, const bool always );
00244         static int my_parent_for_root( const int root );
00245         void try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i );
00246         void try_send_or_put_all();
00247         // home-grown bcast that uses a tree for each root
00248         // at some point something like this should go into the CnC runtime
00249         // returns the number of messages sent (0, 1 or 2)
00250         int bcast( CnC::serializer * ser, int root );
00251         int bcast_count( const OTag & tag, const CType & val, const int root );
00252         virtual void recv_msg( serializer * ser );
00253 #endif
00254         icoll_type & m_in;
00255         ccoll_type & m_cnt;
00256         ocoll_type & m_out;
00257         on_item_put   * m_ondata;
00258         on_count_put   * m_oncount;
00259         ReduceOp     m_op;
00260         Select       m_sel;
00261         tls_map_type m_reductions;
00262         const IType  m_identity;
00263 #ifdef _DIST_CNC_
00264         tbb::atomic< int > m_nDones;
00265         bool         m_alldone;
00266 #endif
00267     };
00268 
00269     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00270     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00271 
00272     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00273     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls()
00274     {
00275         nreduced = 0;
00276         n = M1;
00277         status = LOCAL;
00278 #ifdef _DIST_CNC_
00279         nCounts = -1;
00280         nValues = -1;
00281         owner = -1;
00282 #endif
00283     } 
00284 
00285     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00286 
00287     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00288     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const IType & v )
00289     { 
00290         val = v;
00291         nreduced = 0;
00292         n = M1;
00293         status = LOCAL;
00294 #ifdef _DIST_CNC_
00295         nCounts = -1;
00296         nValues = -1;
00297         owner = -1;
00298 #endif
00299     }
00300 
00301     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00302 
00303     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00304     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::red_tls::red_tls( const red_tls & rt )
00305         : val( rt.val ),
00306           nreduced( rt.nreduced ),
00307           n( rt.n ),
00308           mtx(),
00309 #ifdef _DIST_CNC_
00310           nCounts( rt.nCounts ),
00311           nValues( rt.nValues ),
00312           owner( rt.owner ),
00313 #endif
00314           status( rt.status )
00315     {} 
00316 
00317     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00318     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00319 
00320     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00321     template< typename Ctxt >
00322     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::reduction( CnC::context< Ctxt > & ctxt, const std::string & name,
00323                                                                                                 icoll_type & in, ccoll_type & c, ocoll_type & out,
00324                                                                                                 const ReduceOp & red, const IType & identity, const Select & sel )
00325         : CnC::graph( ctxt, name ),
00326           m_in( in ),
00327           m_cnt( c ),
00328           m_out( out ),
00329           m_ondata( new on_item_put( this ) ),
00330           m_oncount( new on_count_put( this ) ),
00331           m_op( red ),
00332           m_sel( sel ),
00333           m_reductions(),
00334           m_identity( identity )
00335     {
00336         // callback objects must persist the lifetime of collections
00337         m_in.on_put( m_ondata );
00338         m_cnt.on_put( m_oncount );
00339 #ifdef _DIST_CNC_
00340         m_alldone = false;
00341         m_nDones = -1;
00342 #endif
00343     }
00344 
00345     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00346 
00347     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00348     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::~reduction() 
00349     {
00350         // delete m_ondata;
00351         // delete m_oncount;
00352     }
00353 
00354     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00355 
00356     // sometimes you can't tell number of reduced items until all computation is done.
00357     // This call finalizes all reductions, no matter if a count was given or not.
00358     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00359     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::flush()
00360     {
00361 #ifdef _DIST_CNC_
00362         if( Internal::distributor::active() ) {
00363             if( trace_level() > 0 ) {
00364                 Internal::Speaker oss(std::cout);
00365                 oss << this->name() << " flush: bcast ALLDONE";
00366             }
00367             CNC_ASSERT( CnC::tuner_base::myPid() == 0 );
00368             m_alldone = true;
00369             CnC::serializer * ser = this->new_serializer();
00370             (*ser) & DISTRED::ALLDONE;
00371             m_nDones = 1000; // protect from current messages
00372             m_nDones += bcast( ser, 0 );
00373             m_nDones -= 999;
00374             try_send_or_put_all();
00375         } else
00376 #endif
00377         for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00378             m_out.put( i->first, i->second.val.combine( m_op ) );
00379         }
00380     }
00381     
00382     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00383     typename reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::tls_map_type::iterator
00384     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::get( const OTag & tag )
00385     {
00386         typename tls_map_type::iterator i = m_reductions.find( tag );
00387         if( i == m_reductions.end() ) {
00388             i = m_reductions.insert( typename tls_map_type::value_type( tag, red_tls() ) ).first;
00389         }
00390         return i;
00391     }
00392     
00393     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00394 
00395     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00396     bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
00397     {
00398         if( trace_level() > 2 ) {
00399             Internal::Speaker oss(std::cout);
00400             oss << this->name() << " try_put_value [";
00401             cnc_format( oss, otag ) << "]"
00402 #ifdef _DIST_CNC_
00403                 << " nValues " << i->second.nValues << " status " << i->second.status
00404 #endif
00405                 ;
00406         }
00407         if( i->second.nreduced == i->second.n ) {
00408             // setting n could go in parallel, it sets new n and then compares
00409             mutex_type::scoped_lock _lock( i->second.mtx );
00410             if( i->second.status != DONE ) {
00411                 this->m_out.put( otag, i->second.val.combine( this->m_op ) );
00412                 i->second.status = DONE;
00413                 return true;
00414             }
00415         }
00416         return false;
00417     }
00418     
00419     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00420 
00421 #ifdef _DIST_CNC_
00422     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00423     bool reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::send_count( const OTag & otag, const typename tls_map_type::iterator & i,
00424                                                                                                       const int to, const bool always )
00425     {
00426         // we might have a count-put and the last value-put concurrently and all local
00427         // only then the owner/to can be <0 or myself
00428         if( to < 0 || to == CnC::tuner_base::myPid() ) return false;
00429         CType _cnt = i->second.nreduced.fetch_and_store( 0 );
00430         if( always || _cnt > 0 ) { // someone else might have transmitted the combined count already
00431             CnC::serializer * ser = this->new_serializer();
00432             (*ser) & DISTRED::GATHERCOUNT & otag & _cnt;
00433             if( trace_level() > 2 ) {
00434                 Internal::Speaker oss(std::cout);
00435                 oss << this->name() << " send GATHERCOUNT [";
00436                 cnc_format( oss, otag ) << "] " << _cnt << " to " << to;
00437             }
00438             this->send_msg( ser, to );
00439             return true;
00440         }
00441         return false;
00442     }
00443 
00444     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00445 
00446     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00447     int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::my_parent_for_root( const int root )
00448     {
00449         const int mpid = CnC::tuner_base::myPid();
00450         const int nps  = CnC::tuner_base::numProcs();
00451         CNC_ASSERT( root != mpid );
00452         int _p = ( ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) - 1 ) / 2 ) + root;
00453         return _p % nps;
00454     };
00455 
00456     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00457 
00458     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00459     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_value( const OTag & otag, const typename tls_map_type::iterator & i )
00460     {
00461         if( trace_level() > 2 ) {
00462             Internal::Speaker oss(std::cout);
00463             oss << this->name() << " try_send_or_put_value [";
00464             cnc_format( oss, otag ) << "] nValues " << i->second.nValues << " status " << i->second.status;
00465         }
00466         if( i->second.nValues.fetch_and_decrement() == 1 ) {
00467             if( i->second.owner == CnC::tuner_base::myPid() ) {
00468                 if( i->second.status == FINISH ) {
00469                     CNC_ASSERT( i->second.nreduced == i->second.n || i->second.n == M1 );
00470                     CNC_ASSERT( i->second.nValues == 0 && i->second.status == FINISH );
00471                     i->second.nreduced = i->second.n;
00472                     try_put_value( otag, i );
00473                 }
00474             } else {
00475                 CnC::serializer * ser = this->new_serializer();
00476                 IType _val( i->second.val.combine( this->m_op ) );
00477                 (*ser) & DISTRED::VALUE & otag & _val;
00478                 const int to = my_parent_for_root( i->second.owner );
00479                 if( trace_level() > 2 ) {
00480                     Internal::Speaker oss(std::cout);
00481                     oss << this->name() << " send VALUE [";
00482                     cnc_format( oss, otag ) << "] ";
00483                     cnc_format( oss, _val ) << " to " << to;
00484                 }
00485                 this->send_msg( ser, to );
00486                 i->second.status = DONE;
00487             }
00488         }
00489     }
00490         
00491     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00492 
00493     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00494     int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast( CnC::serializer * ser, int root ) 
00495     {
00496         const int mpid = CnC::tuner_base::myPid();
00497         const int nps  = CnC::tuner_base::numProcs();
00498         int _r1 = ( ( mpid >= root ? ( mpid - root ) : ( mpid + nps - root ) ) + 1 ) * 2 - 1;
00499         if( _r1 < nps ) {
00500             if( _r1 < nps-1 ) {
00501                 // we have 2 children
00502                 _r1 = (root + _r1) % nps;
00503                 int _recvrs[2] = { _r1, (_r1+1)%nps };
00504                 this->bcast_msg( ser, _recvrs, 2 );
00505                 return 2;
00506             } else {
00507                 // we have only a single child
00508                 _r1 = (root + _r1) % nps;
00509                 this->send_msg( ser, _r1 );
00510                 return 1;
00511             }
00512         }
00513         delete ser;
00514         // we are a leaf, nothing to be sent
00515         return 0;
00516     }
00517 
00518     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00519 
00520     // home-grown bcast that uses a tree for each root
00521     // at some point something like this should go into the CnC runtime
00522     // returns the number of messages sent (0, 1 or 2)
00523     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00524     int reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::bcast_count( const OTag & tag, const CType & val, const int root )
00525     {
00526         CnC::serializer * ser = this->new_serializer();
00527         (*ser) & ( val != M1 ? DISTRED::BCASTCOUNT : DISTRED::DONE ) & tag & root;
00528         if( val != M1 ) (*ser) & val;
00529         int _c = bcast( ser, root );
00530         if( _c && trace_level() > 2 ) {
00531             Internal::Speaker oss(std::cout);
00532             oss << this->name() << " bcast " << (val != M1 ? " BCASTCOUNT [" : " DONE [");
00533             cnc_format( oss, tag ) << "]";
00534         }
00535         return _c;
00536     };
00537 
00538     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00539 
00540     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00541     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::try_send_or_put_all()
00542     {
00543         CNC_ASSERT( m_alldone );
00544         if( trace_level() > 2 ) {
00545             Internal::Speaker oss(std::cout);
00546             oss << this->name() << " try_send_or_put_all " << m_nDones;
00547         }
00548         if( --m_nDones == 0 ) {
00549             if( CnC::tuner_base::myPid() == 0 ) {
00550                 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00551                     m_out.put( i->first, i->second.val.combine( m_op ) );
00552                 }
00553             } else {
00554                 int _n = m_reductions.size();
00555                 CnC::serializer * ser = this->new_serializer();
00556                 (*ser) & DISTRED::ALLVALUES & _n;
00557                 for( typename tls_map_type::iterator i = m_reductions.begin(); i != m_reductions.end(); ++i ) {
00558                     IType _val( i->second.val.combine( this->m_op ) );
00559                     (*ser) & i->first & _val;
00560                 }
00561                 const int to = my_parent_for_root( 0 );
00562                 if( trace_level() > 2 ) {
00563                     Internal::Speaker oss(std::cout);
00564                     oss << this->name() << " send ALLVALUES to " << to;
00565                 }
00566                 this->send_msg( ser, to );
00567             }
00568         }
00569     }
00570 
00571     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00572 
00573     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00574     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::recv_msg( serializer * ser )
00575     {
00576         char _op;
00577         (*ser) & _op;
00578 
00579         switch( _op ) {
00580         case DISTRED::GATHERCOUNT : {
00581             OTag _tag;
00582             CType _cnt;
00583             (*ser) & _tag & _cnt;
00584             if( trace_level() > 2 ) {
00585                 Internal::Speaker oss(std::cout);
00586                 oss << this->name() << " recvd GATHERCOUNT [";
00587                 cnc_format( oss, _tag ) << "] " << _cnt;
00588             }
00589             m_oncount->on_gatherCount( _tag, get( _tag ), _cnt );
00590             break;
00591         }
00592         case DISTRED::BCASTCOUNT : {
00593             OTag _tag;
00594             CType _cnt;
00595             int _owner;
00596             (*ser) & _tag & _owner & _cnt ;
00597             if( trace_level() > 2 ) {
00598                 Internal::Speaker oss(std::cout);
00599                 oss << this->name() << " recvd BCASTCOUNT [";
00600                 cnc_format( oss, _tag ) << "] " << _cnt << " " << _owner;
00601             }
00602             m_oncount->on_bcastCount( _tag, get( _tag ), _cnt, _owner );
00603             break;
00604         }
00605         case DISTRED::VALUE : {
00606             OTag _tag;
00607             IType _val;
00608             (*ser) & _tag & _val;
00609             if( trace_level() > 2 ) {
00610                 Internal::Speaker oss(std::cout);
00611                 oss << this->name() << " recvd VALUE [";
00612                 cnc_format( oss, _tag ) << "] ";
00613                 cnc_format( oss, _val );
00614             }
00615             m_ondata->on_value( _tag, get( _tag ), _val );
00616             break;
00617         }
00618         case DISTRED::DONE : {
00619             OTag _tag;
00620             int _owner;
00621             (*ser) & _tag & _owner;
00622             if( trace_level() > 2 ) {
00623                 Internal::Speaker oss(std::cout);
00624                 oss << this->name() << " recvd DONE [";
00625                 cnc_format( oss, _tag ) << "] " << _owner;
00626             }
00627             m_oncount->on_done( _tag, get( _tag ), _owner );
00628             break;
00629         }
00630         case DISTRED::ALLDONE : {
00631             if( trace_level() > 2 ) {
00632                 Internal::Speaker oss(std::cout);
00633                 oss << this->name() << " recvd ALLDONE";
00634             }
00635             m_alldone = true;
00636             CnC::serializer * ser = this->new_serializer();
00637             (*ser) & DISTRED::ALLDONE;
00638             m_nDones = 1000; // protect from current messages
00639             m_nDones += bcast( ser, 0 );
00640             m_nDones -= 999;
00641             try_send_or_put_all();
00642             break;
00643         }
00644         default:
00645         case DISTRED::ALLVALUES : {
00646             CNC_ASSERT( m_alldone = true );
00647             int _n;
00648             (*ser) & _n;
00649             if( trace_level() > 2 ) {
00650                 Internal::Speaker oss(std::cout);
00651                 oss << this->name() << " recvd ALLVALUES " << _n;
00652             }
00653             while( _n-- ) {
00654                 OTag _tag;
00655                 IType _val;
00656                 (*ser) & _tag & _val;
00657                 const typename tls_map_type::iterator i = get( _tag );
00658                 m_ondata->add_value( i, _val );
00659             }
00660             try_send_or_put_all();
00661             break;
00662         }
00663             CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
00664         }
00665     }
00666 
00667 #endif // _DIST_CNC_
00668         
00669     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00670     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00671     
00672     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00673     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_item_put( reduce_type * r )
00674         : m_reduce( r )
00675     {}
00676     
00677     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00678 
00679     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00680     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::add_value( const typename tls_map_type::iterator & i, const IType & val ) const
00681     {
00682         bool _exists;
00683         IType & _rval = i->second.val.local( _exists );
00684         _rval = m_reduce->m_op( _exists ? _rval : m_reduce->m_identity, val );
00685     }
00686  
00687     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00688 
00689 #ifdef _DIST_CNC_
00690     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00691     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_value( const OTag & otag, const typename tls_map_type::iterator & i, const IType & val )
00692     {
00693         TRACE( "::on_value" );
00694         add_value( i, val );
00695         m_reduce->try_send_or_put_value( otag, i );
00696     }
00697 #endif
00698 
00699     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00700 
00701     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00702     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_item_put::on_put( const ITag & tag, const IType & val )
00703     {
00704         TRACE( "::on_data" );
00705         OTag otag;
00706         if( m_reduce->m_sel( tag, otag ) ) {
00707             typename tls_map_type::iterator i = m_reduce->get( otag );
00708             add_value( i, val );
00709             CType _n = ++i->second.nreduced;
00710             if( m_reduce->trace_level() > 0 ) { 
00711                 Internal::Speaker oss(std::cout);
00712                 oss << m_reduce->name() << " on_put [";
00713                 cnc_format( oss, tag ) << "] for [";
00714                 cnc_format( oss, otag ) << "] ";
00715                 cnc_format( oss, val );
00716                 oss << " nred now " << _n << "/" << i->second.n
00717 #ifdef _DIST_CNC_
00718                     << " owner " << i->second.owner << " status " << i->second.status << " nCounts " << i->second.nCounts
00719 #endif
00720                     ;
00721             }
00722 #ifdef _DIST_CNC_
00723             // not yet done, might need some communication
00724             if( Internal::distributor::active() ) {
00725                 // just in case, test if all was local
00726                 // 2 cases
00727                 // 1.: local phase or we are the owner
00728                 // 2.: we know the red-count, but the reduction is not yet done.
00729                 if( i->second.status >= CNT_AVAILABLE ) {
00730                     if( CnC::tuner_base::myPid() == i->second.owner ) {
00731                         // the owner's put was the final put, let's trigger the done propagation
00732                         if( _n == i->second.n && i->second.nCounts <= 0 ) m_reduce->m_oncount->on_done( otag, i, CnC::tuner_base::myPid() );
00733                     } else {
00734                         // 2nd case
00735                         // We have to report the new count (not the value)
00736                         // we use an atomic variable to count, so we do not lose contributions
00737                         // we don't report the value, so nothing gets inconsistent
00738                         // the value is transmitted only when we know nothing more comes in (elsewhere)
00739                         m_reduce->send_count( otag, i, i->second.owner, false );
00740                     }
00741                 } // else means we are still collecting locally (case 1)
00742                 //                        }
00743             } else
00744 #endif
00745                 m_reduce->try_put_value( otag, i );
00746         } else if( m_reduce->trace_level() > 0 ) { 
00747             Internal::Speaker oss(std::cout);
00748             oss << m_reduce->name() << " [";
00749             cnc_format( oss, tag ) << "] was not selected";
00750         }
00751     }
00752             
00753     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00754     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00755 
00756     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00757     reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_count_put( reduce_type * r )
00758         : m_reduce( r )
00759     {}
00760 
00761     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00762 
00763 #ifdef _DIST_CNC_
00764     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00765     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_done( const OTag & otag, const typename tls_map_type::iterator & i, const int owner )
00766     {
00767         TRACE( "::on_done" );
00768         if( m_reduce->trace_level() > 2 ) { 
00769             Internal::Speaker oss(std::cout);
00770             oss << m_reduce->name() << " on_done for [";
00771             cnc_format( oss, otag ) << "] nred now " << i->second.nreduced << "/" << i->second.n
00772                                     << " owner " << i->second.owner << " status " << i->second.status;
00773         }
00774         i->second.owner = owner;
00775         if( owner != CnC::tuner_base::myPid() || i->second.status.compare_and_swap( BCAST_DONE, CNT_AVAILABLE ) == CNT_AVAILABLE ) {
00776             // "forward" through bcast-tree (we are using our home-grown bcast!)
00777             i->second.nValues = 1000;
00778             i->second.nValues += m_reduce->bcast_count( otag, M1, owner );
00779             int _tmp = i->second.status.compare_and_swap( FINISH, BCAST_DONE );
00780             CNC_ASSERT( owner != CnC::tuner_base::myPid() || _tmp == BCAST_DONE );
00781             // we leave one more for ourself (no contribution to value)
00782                         i->second.nValues -= 999;
00783             // we might be a leaf or all the values came back between the bcast and now
00784             m_reduce->try_send_or_put_value( otag, i );
00785         }
00786     }
00787 
00788     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00789 
00790     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00791     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_bcastCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt, const int owner )
00792     {
00793         TRACE( "::on_bcast" );
00794         CNC_ASSERT( cnt != M1 );
00795         i->second.owner = owner;
00796         i->second.n = cnt;
00797 
00798         int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
00799         CNC_ASSERT( _tmp == LOCAL );
00800         // "forward" through bcast-tree (we are using our home-grown bcast!)
00801         i->second.nCounts = 1;
00802         i->second.nCounts += m_reduce->bcast_count( otag, cnt, owner );
00803         // if we are a leaf -> trigger gather to root
00804         if( --i->second.nCounts == 0 && owner != CnC::tuner_base::myPid() ) {
00805             m_reduce->send_count( otag, i, my_parent_for_root( owner ), true );
00806         }
00807     }
00808 
00809     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00810 
00811     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00812     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_gatherCount( const OTag & otag, const typename tls_map_type::iterator & i, const CType & cnt )
00813     {
00814         TRACE( "::on_gather" );
00815         CNC_ASSERT( cnt != M1 );
00816         i->second.nreduced += cnt;
00817         if( m_reduce->trace_level() > 2 ) { 
00818             Internal::Speaker oss(std::cout);
00819             oss << m_reduce->name() << " on_gatherCount [";
00820             cnc_format( oss, otag ) << "] now " << i->second.nreduced << "/" << i->second.n << " nCounts " << i->second.nCounts;
00821         }
00822         // at root, we might need to trigger done phase
00823         if( --i->second.nCounts <= 0 ) { // there might be extra counts
00824             if( i->second.owner == CnC::tuner_base::myPid() ) {
00825                 if( i->second.n == i->second.nreduced ) {
00826                     on_done( otag, i, CnC::tuner_base::myPid() );
00827                 }
00828             } else {
00829                 CNC_ASSERT( i->second.nCounts == 0 ); // extra counts occur only on root
00830                 m_reduce->send_count( otag, i, my_parent_for_root( i->second.owner ), true );
00831             }
00832         }
00833     }
00834 #endif // _DIST_CNC_
00835 
00836     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00837 
00838     template< typename ITag, typename IType, typename ITuner, typename CType, typename CTuner, typename OTag, typename OTuner, typename ReduceOp, typename Select >
00839     void reduction< ITag, IType, ITuner, CType, CTuner, OTag, OTuner, ReduceOp, Select >::on_count_put::on_put( const OTag & otag, const CType & cnt )
00840     {
00841         TRACE( "::on_cnt" );
00842         typename tls_map_type::iterator i = m_reduce->get( otag );
00843                 
00844 #ifdef _DIST_CNC_
00845         if( Internal::distributor::active() ) {
00846             if( cnt != M1 ) { //is this a normal count?
00847                 i->second.n = cnt;
00848                 // just in case all was local, we try to finish
00849                 if( ! m_reduce->try_put_value( otag, i ) ) {
00850                     on_bcastCount( otag, i, cnt, CnC::tuner_base::myPid() );
00851                 }
00852             } else { // this is the done flag
00853                 // we have no idea what was put remotely
00854                 // -> we trigger the final gather phase
00855                 int _tmp = i->second.status.compare_and_swap( CNT_AVAILABLE, LOCAL );
00856                 CNC_ASSERT( _tmp == LOCAL );
00857                 i->second.nCounts = 0;
00858                 on_done( otag, i, CnC::tuner_base::myPid() );
00859             }
00860         } else
00861 #endif
00862             {
00863                 if( cnt != M1 ) i->second.n = cnt;
00864                 else i->second.n = i->second.nreduced;
00865                 m_reduce->try_put_value( otag, i );
00866             }
00867     }
00868 
00869     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00870 
00871 } // namespace CnC
00872 
00873 /// @}
00874 
00875 #endif //_CnC_REDUCE_H_