CnC
|
00001 /* ******************************************************************************* 00002 * Copyright (c) 2007-2014, Intel Corporation 00003 * 00004 * Redistribution and use in source and binary forms, with or without 00005 * modification, are permitted provided that the following conditions are met: 00006 * 00007 * * Redistributions of source code must retain the above copyright notice, 00008 * this list of conditions and the following disclaimer. 00009 * * Redistributions in binary form must reproduce the above copyright 00010 * notice, this list of conditions and the following disclaimer in the 00011 * documentation and/or other materials provided with the distribution. 00012 * * Neither the name of Intel Corporation nor the names of its contributors 00013 * may be used to endorse or promote products derived from this software 00014 * without specific prior written permission. 00015 * 00016 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 00017 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 00018 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 00019 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE 00020 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 00021 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 00022 * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 00023 * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 00024 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 00025 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 00026 ********************************************************************************/ 00027 00028 /* 00029 Joins/corss products for CnC, provided as a re-usable graph. 00030 */ 00031 00032 #ifndef _CnC_JOIN_H_ 00033 #define _CnC_JOIN_H_ 00034 00035 #include <cnc/internal/cnc_stddef.h> 00036 #include <cnc/internal/tbbcompat.h> 00037 #include <tbb/queuing_mutex.h> 00038 #include <set> 00039 00040 namespace CnC 00041 { 00042 00043 template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > class join; 00044 00045 /// Returns a graph that joins 2 tag-collections into a third one. 00046 /// Continuously produces the join product of the 2 input tag-collections 00047 /// and puts it into the output tag-collection. 00048 /// 00049 /// Accepts any types; only requires that a an output tag-type 00050 /// provides a constructor which accepts (taga, tagb) to construct 00051 /// a joined tag from tag a and b. 00052 /// 00053 /// On distributed memory duplicate output tags might be produced. 00054 /// To avoid duplicate step execution use tag-preservation on the 00055 /// output tag-collection (CnC::preserve_tuner) and suitable 00056 /// distribution functions on the prescribed step-collection 00057 /// (CnC::step_tuner). 00058 /// 00059 /// \param ctxt the context to which the graph belongs 00060 /// \param name the name of this join graph instance 00061 /// \param a first input collection 00062 /// \param b second input collection 00063 /// \param c output collection 00064 template< typename Ctxt, typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > 00065 graph * make_join_graph( CnC::context< Ctxt > & ctxt, const std::string & name, 00066 CnC::tag_collection< TagA, TunerA > & a, 00067 CnC::tag_collection< TagB, TunerB > & b, 00068 CnC::tag_collection< TagC, TunerC > & c ) 00069 { 00070 return new join< TagA, TunerA, TagB, TunerB, TagC, TunerC >( ctxt, name, a, b, c ); 00071 } 00072 00073 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00074 // %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% 00075 00076 // The actual join graph. A complex template construct; use 00077 // make_join_graph to create a join graph. Only the constructor is 00078 // public, everything else is private ("hidden"). 00079 // 00080 // When ever a new tag arrives, it gets joined with all existing 00081 // tags in the other collection. The actual join is protected by a 00082 // mutex. This makes it safe on a single process. 00083 // 00084 // We cannot rely on the collection to keep tags because we 00085 // currently have no thread-safe iteration facility. Hence we use 00086 // std::set to keep them privately in the graph. 00087 // 00088 // We need explicit handling of distributed memory. We keep the 00089 // tags locally where they are put and just send a bcast message 00090 // when a new tag arrives. This can produce duplicate output tags. 00091 template< typename TagA, typename TunerA, typename TagB, typename TunerB, typename TagC, typename TunerC > 00092 class join : public CnC::graph, CnC::Internal::no_copy 00093 { 00094 typedef tbb::queuing_mutex mutex_type; 00095 typedef std::set< TagA > seta_type; 00096 typedef std::set< TagB > setb_type; 00097 typedef CnC::tag_collection< TagA, TunerA > colla_type; 00098 typedef CnC::tag_collection< TagB, TunerB > collb_type; 00099 typedef CnC::tag_collection< TagC, TunerC > collc_type; 00100 typedef join< TagA, TunerA, TagB, TunerB, TagC, TunerC > join_type; 00101 enum { JOINA = 'A', JOINB = 'B' }; 00102 friend struct on_a_put; 00103 friend struct on_b_put; 00104 00105 public: 00106 template< typename Ctxt > 00107 join( CnC::context< Ctxt > & ctxt, const std::string & name, colla_type & a, collb_type & b, collc_type & c ) 00108 : CnC::graph( ctxt, name ), 00109 m_a( a ), 00110 m_b( b ), 00111 m_c( c ), 00112 m_seta(), 00113 m_setb(), 00114 m_mutex() 00115 { 00116 // callback objects must persist the lifetime of collections 00117 m_a.on_put( new on_a_put( this ) ); 00118 m_b.on_put( new on_b_put( this ) ); 00119 } 00120 00121 // the implementation is "hidden" 00122 private: 00123 // join a given tag with all existing tags in the other set 00124 template< typename Tag, typename Set, typename joiner > 00125 void join_one( const Tag & t, const Set & set, const joiner & j ) 00126 { 00127 // in a more advanced version we can go parallel by using parallel_for or a range 00128 for( typename Set::iterator i=set.begin(); i!=set.end(); ++i ) { 00129 m_c.put( j( t, *i ) ); 00130 } 00131 } 00132 00133 // join a b-tag with a a-tag 00134 struct join_ba 00135 { 00136 TagC operator()( const TagB & b, const TagA & a ) const { 00137 return TagC( a, b ); 00138 } 00139 }; 00140 // join a a-tag with a b-tag 00141 struct join_ab 00142 { 00143 TagC operator()( const TagA & a, const TagB & b ) const { 00144 return TagC( a, b ); 00145 } 00146 }; 00147 typedef join_ab join_ab_type; 00148 typedef join_ba join_ba_type; 00149 00150 template< typename Tag > 00151 void send_tag( const Tag & tag, const char op ) 00152 { 00153 #ifdef _DIST_CNC_ 00154 if( Internal::distributor::active() ) { 00155 if( this->trace_level() > 2 ) { 00156 Internal::Speaker oss(std::cout); 00157 oss << this->name() << "::send_tag JOIN" << (op==JOINA?"A":"B") << " ["; 00158 cnc_format( oss, tag ); 00159 oss << "]"; 00160 } 00161 CnC::serializer * ser = this->new_serializer(); 00162 (*ser) & op & tag; 00163 this->bcast_msg( ser ); 00164 } 00165 #endif 00166 } 00167 00168 // callback for collection a 00169 struct on_a_put : public colla_type::callback_type 00170 { 00171 on_a_put( join_type * j ) : m_join( j ) {} 00172 00173 void on_put( const TagA & tag ) 00174 { 00175 if( m_join->trace_level() > 0 ) { 00176 Internal::Speaker oss(std::cout); 00177 oss << m_join->name() << "::on_put_a <"; 00178 cnc_format( oss, tag ); 00179 oss << ">"; 00180 } 00181 m_join->send_tag( tag, JOINA ); 00182 mutex_type::scoped_lock _lock( m_join->m_mutex ); 00183 m_join->join_one( tag, m_join->m_setb, join_ab_type() ); 00184 m_join->m_seta.insert( tag ); 00185 } 00186 private: 00187 join_type * m_join; 00188 }; 00189 00190 // callback for collection b 00191 struct on_b_put : public collb_type::callback_type 00192 { 00193 on_b_put( join_type * j ) : m_join( j ) {} 00194 00195 void on_put( const TagB & tag ) 00196 { 00197 if( m_join->trace_level() > 0 ) { 00198 Internal::Speaker oss(std::cout); 00199 oss << m_join->name() << "::on_put_b <"; 00200 cnc_format( oss, tag ); 00201 oss << ">"; 00202 } 00203 m_join->send_tag( tag, JOINB ); 00204 mutex_type::scoped_lock _lock( m_join->m_mutex ); 00205 m_join->join_one( tag, m_join->m_seta, join_ba_type() ); 00206 m_join->m_setb.insert( tag ); 00207 } 00208 private: 00209 join_type * m_join; 00210 }; 00211 00212 #ifdef _DIST_CNC_ 00213 virtual void recv_msg( serializer * ser ) 00214 { 00215 char _op; 00216 (*ser) & _op; 00217 switch( _op ) { 00218 case JOINA : { 00219 TagA _tag; 00220 (*ser) & _tag; 00221 if( this->trace_level() > 2 ) { 00222 Internal::Speaker oss(std::cout); 00223 oss << this->name() << "::recv_msg JOINA <"; 00224 cnc_format( oss, _tag ); 00225 oss << ">"; 00226 } 00227 mutex_type::scoped_lock _lock( m_mutex ); 00228 join_one( _tag, m_setb, join_ab_type() ); 00229 break; 00230 } 00231 case JOINB : { 00232 CNC_ASSERT_MSG( _op == JOINB, "Unexpected message tag" ); 00233 TagB _tag; 00234 (*ser) & _tag; 00235 if( this->trace_level() > 2 ) { 00236 Internal::Speaker oss(std::cout); 00237 oss << this->name() << "::recv_msg JOINB <"; 00238 cnc_format( oss, _tag ); 00239 oss << ">"; 00240 } 00241 mutex_type::scoped_lock _lock( m_mutex ); 00242 join_one( _tag, m_seta, join_ba_type() ); 00243 break; 00244 } 00245 default : 00246 CNC_ASSERT_MSG( false, "Unexpected message tag in JOIN" ); 00247 } 00248 } 00249 #endif 00250 00251 colla_type & m_a; 00252 collb_type & m_b; 00253 collc_type & m_c; 00254 mutex_type m_mutex; 00255 seta_type m_seta; 00256 setb_type m_setb; 00257 }; 00258 00259 00260 } // namespace CnC 00261 00262 #endif //_CnC_JOIN_H_