35 #include <cnc/internal/cnc_stddef.h> 36 #include <cnc/internal/tbbcompat.h> 37 #include <tbb/queuing_mutex.h> 43 template<
typename TagA,
typename TunerA,
typename TagB,
typename TunerB,
typename TagC,
typename TunerC >
class join;
64 template<
typename Ctxt,
typename TagA,
typename TunerA,
typename TagB,
typename TunerB,
typename TagC,
typename TunerC >
70 return new join< TagA, TunerA, TagB, TunerB, TagC, TunerC >( ctxt, name, a, b, c );
91 template<
typename TagA,
typename TunerA,
typename TagB,
typename TunerB,
typename TagC,
typename TunerC >
92 class join :
public CnC::graph, CnC::Internal::no_copy
94 typedef tbb::queuing_mutex mutex_type;
95 typedef std::set< TagA > seta_type;
96 typedef std::set< TagB > setb_type;
100 typedef join< TagA, TunerA, TagB, TunerB, TagC, TunerC > join_type;
101 enum { JOINA =
'A', JOINB =
'B' };
102 friend struct on_a_put;
103 friend struct on_b_put;
106 template<
typename Ctxt >
107 join(
CnC::context< Ctxt > & ctxt,
const std::string & name, colla_type & a, collb_type & b, collc_type & c )
117 m_a.on_put(
new on_a_put(
this ) );
118 m_b.on_put(
new on_b_put(
this ) );
124 template<
typename Tag,
typename Set,
typename joiner >
125 void join_one(
const Tag & t,
const Set &
set,
const joiner & j )
128 for(
typename Set::iterator i=
set.begin(); i!=
set.end(); ++i ) {
129 m_c.put( j( t, *i ) );
136 TagC operator()(
const TagB & b,
const TagA & a )
const {
143 TagC operator()(
const TagA & a,
const TagB & b )
const {
147 typedef join_ab join_ab_type;
148 typedef join_ba join_ba_type;
150 template<
typename Tag >
151 void send_tag(
const Tag & tag,
const char op )
154 if( Internal::distributor::active() ) {
155 if( this->trace_level() > 2 ) {
156 Internal::Speaker oss(std::cout);
157 oss << this->name() <<
"::send_tag JOIN" << (op==JOINA?
"A":
"B") <<
" [";
158 cnc_format( oss, tag );
163 this->bcast_msg( ser );
171 on_a_put( join_type * j ) : m_join( j ) {}
173 void on_put(
const TagA & tag )
175 if( m_join->trace_level() > 0 ) {
176 Internal::Speaker oss(std::cout);
177 oss << m_join->name() <<
"::on_put_a <";
178 cnc_format( oss, tag );
181 m_join->send_tag( tag, JOINA );
182 mutex_type::scoped_lock _lock( m_join->m_mutex );
183 m_join->join_one( tag, m_join->m_setb, join_ab_type() );
184 m_join->m_seta.insert( tag );
193 on_b_put( join_type * j ) : m_join( j ) {}
195 void on_put(
const TagB & tag )
197 if( m_join->trace_level() > 0 ) {
198 Internal::Speaker oss(std::cout);
199 oss << m_join->name() <<
"::on_put_b <";
200 cnc_format( oss, tag );
203 m_join->send_tag( tag, JOINB );
204 mutex_type::scoped_lock _lock( m_join->m_mutex );
205 m_join->join_one( tag, m_join->m_seta, join_ba_type() );
206 m_join->m_setb.insert( tag );
221 if( this->trace_level() > 2 ) {
222 Internal::Speaker oss(std::cout);
223 oss << this->name() <<
"::recv_msg JOINA <";
224 cnc_format( oss, _tag );
227 mutex_type::scoped_lock _lock( m_mutex );
228 join_one( _tag, m_setb, join_ab_type() );
232 CNC_ASSERT_MSG( _op == JOINB,
"Unexpected message tag" );
235 if( this->trace_level() > 2 ) {
236 Internal::Speaker oss(std::cout);
237 oss << this->name() <<
"::recv_msg JOINB <";
238 cnc_format( oss, _tag );
241 mutex_type::scoped_lock _lock( m_mutex );
242 join_one( _tag, m_seta, join_ba_type() );
246 CNC_ASSERT_MSG(
false,
"Unexpected message tag in JOIN" );
262 #endif //_CnC_JOIN_H_
A tag collection is a set of tags of the same type. It is used to prescribe steps. By default, tags are not stored.
Internal::tag_collection_base< Tag, Tuner >::callback_type callback_type
Base class for defining and using CnC (sub-)graphs.
CnC context bringing together collections (for steps, items and tags).
graph * make_join_graph(CnC::context< Ctxt > &ctxt, const std::string &name, CnC::tag_collection< TagA, TunerA > &a, CnC::tag_collection< TagB, TunerB > &b, CnC::tag_collection< TagC, TunerC > &c)
Handles serilialization of data-objects.