CnC
join.h
00001 /* *******************************************************************************
00002  *  Copyright (c) 2007-2014, Intel Corporation
00003  *
00004  *  Redistribution and use in source and binary forms, with or without
00005  *  modification, are permitted provided that the following conditions are met:
00006  *
00007  *  * Redistributions of source code must retain the above copyright notice,
00008  *    this list of conditions and the following disclaimer.
00009  *  * Redistributions in binary form must reproduce the above copyright
00010  *    notice, this list of conditions and the following disclaimer in the
00011  *    documentation and/or other materials provided with the distribution.
00012  *  * Neither the name of Intel Corporation nor the names of its contributors
00013  *    may be used to endorse or promote products derived from this software
00014  *    without specific prior written permission.
00015  *
00016  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
00017  *  AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
00018  *  IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
00019  *  DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
00020  *  FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00021  *  DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
00022  *  SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
00023  *  CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
00024  *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00025  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  ********************************************************************************/
00027 
00028 /*
00029   Joins/corss products for CnC, provided as a re-usable graph.
00030 */
00031 
00032 #ifndef _CnC_JOIN_H_
00033 #define _CnC_JOIN_H_
00034 
00035 #include <cnc/internal/cnc_stddef.h>
00036 #include <cnc/internal/tbbcompat.h>
00037 #include <tbb/queuing_mutex.h>
00038 #include <set>
00039 
00040 namespace CnC
00041 {
00042 
00043     template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > class join;
00044 
00045     /// Returns a graph that joins 2 tag-collections into a third one.
00046     /// Continuously produces the join product of the 2 input tag-collections
00047     /// and puts it into the output tag-collection.
00048     ///
00049     /// Accepts any types; only requires that a an output tag-type
00050     /// provides a constructor which accepts (taga, tagb) to construct
00051     /// a joined tag from tag a and b.
00052     ///
00053     /// On distributed memory duplicate output tags might be produced.
00054     /// To avoid duplicate step execution use tag-preservation on the
00055     /// output tag-collection (CnC::preserve_tuner) and suitable
00056     /// distribution functions on the prescribed step-collection
00057     /// (CnC::step_tuner).
00058     ///
00059     /// \param ctxt  the context to which the graph belongs
00060     /// \param name  the name of this join graph instance
00061     /// \param a     first input collection
00062     /// \param b     second input collection
00063     /// \param c     output collection
00064     template< typename Ctxt, typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
00065     graph * make_join_graph( CnC::context< Ctxt > & ctxt, const std::string & name, 
00066                              CnC::tag_collection< TagA, TunerA > & a,
00067                              CnC::tag_collection< TagB, TunerB > & b,
00068                              CnC::tag_collection< TagC, TunerC > & c )
00069     {
00070         return new join< TagA, TunerA, TagB, TunerB, TagC, TunerC >( ctxt, name, a, b, c );
00071     }
00072 
00073     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00074     // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
00075 
00076     // The actual join graph. A complex template construct; use
00077     // make_join_graph to create a join graph. Only the constructor is
00078     // public, everything else is private ("hidden").
00079     //
00080     // When ever a new tag arrives, it gets joined with all existing
00081     // tags in the other collection.  The actual join is protected by a
00082     // mutex. This makes it safe on a single process.
00083     //
00084     // We cannot rely on the collection to keep tags because we
00085     // currently have no thread-safe iteration facility. Hence we use
00086     // std::set to keep them privately in the graph.
00087     //
00088     // We need explicit handling of distributed memory. We keep the
00089     // tags locally where they are put and just send a bcast message
00090     // when a new tag arrives. This can produce duplicate output tags.
00091     template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC >
00092     class join : public CnC::graph, CnC::Internal::no_copy
00093     {
00094         typedef tbb::queuing_mutex mutex_type;
00095         typedef std::set< TagA > seta_type;
00096         typedef std::set< TagB > setb_type;
00097         typedef CnC::tag_collection< TagA, TunerA > colla_type;
00098         typedef CnC::tag_collection< TagB, TunerB > collb_type;
00099         typedef CnC::tag_collection< TagC, TunerC > collc_type;
00100         typedef join< TagA, TunerA, TagB, TunerB, TagC, TunerC > join_type;
00101         enum { JOINA = 'A', JOINB = 'B' };
00102         friend struct on_a_put;
00103         friend struct on_b_put;
00104 
00105     public:
00106         template< typename Ctxt >
00107         join( CnC::context< Ctxt > & ctxt, const std::string & name, colla_type & a, collb_type & b, collc_type & c )
00108             : CnC::graph( ctxt, name ),
00109               m_a( a ),
00110               m_b( b ),
00111               m_c( c ),
00112               m_seta(),
00113               m_setb(),
00114               m_mutex()
00115         {
00116             // callback objects must persist the lifetime of collections
00117             m_a.on_put( new on_a_put( this ) );
00118             m_b.on_put( new on_b_put( this ) );
00119         }
00120 
00121         // the implementation is "hidden"
00122         private:
00123         // join a given tag with all existing tags in the other set
00124         template< typename Tag, typename Set, typename joiner >
00125         void join_one( const Tag & t, const Set & set, const joiner & j )
00126         {
00127             // in a more advanced version we can go parallel by using parallel_for or a range
00128             for( typename Set::iterator i=set.begin(); i!=set.end(); ++i ) {
00129                 m_c.put( j( t, *i ) );
00130             }
00131         }
00132 
00133         // join a b-tag with a a-tag
00134         struct join_ba
00135         { 
00136             TagC operator()( const TagB & b, const TagA & a ) const {
00137                                 return TagC( a, b );
00138             }
00139         };
00140         // join a a-tag with a b-tag
00141         struct join_ab
00142         { 
00143             TagC operator()( const TagA & a, const TagB & b ) const {
00144                                 return TagC( a, b );
00145             }
00146         };
00147         typedef join_ab join_ab_type;
00148         typedef join_ba join_ba_type;
00149 
00150         template< typename Tag >
00151         void send_tag( const Tag & tag, const char op )
00152         {
00153 #ifdef _DIST_CNC_
00154             if( Internal::distributor::active() ) {
00155                 if( this->trace_level() > 2 ) {
00156                     Internal::Speaker oss(std::cout);
00157                     oss << this->name() << "::send_tag JOIN" << (op==JOINA?"A":"B") << " [";
00158                     cnc_format( oss, tag );
00159                     oss << "]";
00160                 }
00161                 CnC::serializer * ser = this->new_serializer();
00162                 (*ser) & op & tag;
00163                 this->bcast_msg( ser );
00164             }
00165 #endif
00166         }
00167 
00168         // callback for collection a
00169         struct on_a_put : public colla_type::callback_type
00170         {
00171             on_a_put( join_type * j  ) : m_join( j ) {}
00172 
00173             void on_put( const TagA & tag )
00174             {
00175                 if( m_join->trace_level() > 0 ) {
00176                     Internal::Speaker oss(std::cout);
00177                     oss << m_join->name() << "::on_put_a <";
00178                     cnc_format( oss, tag );
00179                     oss << ">";
00180                 }
00181                 m_join->send_tag( tag, JOINA );
00182                 mutex_type::scoped_lock _lock( m_join->m_mutex );
00183                 m_join->join_one( tag, m_join->m_setb, join_ab_type() );
00184                 m_join->m_seta.insert( tag );
00185             }
00186         private:
00187             join_type * m_join;
00188         };
00189 
00190         // callback for collection b
00191         struct on_b_put : public collb_type::callback_type
00192         {
00193             on_b_put( join_type * j  ) : m_join( j ) {}
00194 
00195             void on_put( const TagB & tag )
00196             {
00197                 if( m_join->trace_level() > 0 ) {
00198                     Internal::Speaker oss(std::cout);
00199                     oss << m_join->name() << "::on_put_b <";
00200                     cnc_format( oss, tag );
00201                     oss << ">";
00202                 }
00203                 m_join->send_tag( tag, JOINB );
00204                 mutex_type::scoped_lock _lock( m_join->m_mutex );
00205                 m_join->join_one( tag, m_join->m_seta, join_ba_type() );
00206                 m_join->m_setb.insert( tag );
00207             }
00208         private:
00209             join_type * m_join;
00210         };
00211                 
00212 #ifdef _DIST_CNC_
00213         virtual void recv_msg( serializer * ser )
00214         {
00215             char _op;
00216             (*ser) & _op;
00217             switch( _op ) {
00218             case JOINA : {
00219                 TagA _tag;
00220                 (*ser) & _tag;
00221                 if( this->trace_level() > 2 ) {
00222                     Internal::Speaker oss(std::cout);
00223                     oss << this->name() << "::recv_msg JOINA <";
00224                     cnc_format( oss, _tag );
00225                     oss << ">";
00226                 }
00227                 mutex_type::scoped_lock _lock( m_mutex );
00228                 join_one( _tag, m_setb, join_ab_type() );
00229                 break;
00230             }
00231             case JOINB : {
00232                 CNC_ASSERT_MSG( _op == JOINB, "Unexpected message tag" );
00233                 TagB _tag;
00234                 (*ser) & _tag;
00235                 if( this->trace_level() > 2 ) {
00236                     Internal::Speaker oss(std::cout);
00237                     oss << this->name() << "::recv_msg JOINB <";
00238                     cnc_format( oss, _tag );
00239                     oss << ">";
00240                 }
00241                 mutex_type::scoped_lock _lock( m_mutex );
00242                 join_one( _tag, m_seta, join_ba_type() );
00243                 break;
00244             }
00245             default :
00246                 CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" );
00247             }
00248         }
00249 #endif
00250 
00251         colla_type & m_a;
00252         collb_type & m_b;
00253         collc_type & m_c;
00254         mutex_type   m_mutex;
00255         seta_type    m_seta;
00256         setb_type    m_setb;
00257     };
00258 
00259 
00260 } // namespace CnC
00261 
00262 #endif //_CnC_JOIN_H_