PaCO++
0.05
|
00001 00002 #include "PaCO++_operation.h" 00003 #include "PaCO++_src.h" 00004 00005 PaCO::PacoTopology_t PacoTopologySeq = { 1 }; 00006 00007 static ostream & 00008 operator<<(ostream & os, const CORBA::Exception & e) 00009 { 00010 CORBA::Any tmp; 00011 tmp <<=e ; 00012 CORBA::TypeCode_var tc = tmp.type(); 00013 const char * p = tc->name (); 00014 if (*p != '\0') 00015 os << p; 00016 else 00017 os << tc->id(); 00018 return os; 00019 } 00020 00021 void* paco_orb_run(void* arg) 00022 { 00023 CORBA::ORB_ptr orb = (CORBA::ORB_ptr) arg; 00024 orb->run(); 00025 return (void*) 0; 00026 } 00027 00028 paco_fabrique_manager* paco_getFabriqueManager() 00029 { 00030 static paco_fabrique_manager* pfm=NULL; 00031 00032 if (!pfm) 00033 pfm = new paco_fabrique_manager(); 00034 00035 return pfm; 00036 } 00037 00038 00039 // class InterfaceManager_impl 00040 00041 InterfaceManager_impl::InterfaceManager_impl(CORBA::ORB_ptr current_orb) 00042 { 00043 _orb = current_orb; 00044 _current_uid = 0; 00045 _totalNode = -1; 00046 _workNode = -1; 00047 _totalNodeReceived = 0; 00048 _rtn = 0; // normal 00049 } 00050 00051 InterfaceManager_impl::~InterfaceManager_impl() {} 00052 00053 void 00054 InterfaceManager_impl::setReturn(int rtn) 00055 { 00056 _rtn = rtn; 00057 } 00058 00059 void 00060 InterfaceManager_impl::setTotalNode(CORBA::Short totalNode) 00061 { 00062 if (totalNode > 0) 00063 { 00064 _totalNode = totalNode; 00065 // Init de _informations_temp; 00066 _informations_temp.nodes.length(_totalNode); 00067 _informations_temp.serveur_topo.total = _totalNode; 00068 _informations.nodes.length(_totalNode); 00069 _informations.serveur_topo.total= _totalNode; 00070 } 00071 updateContexts(); 00072 } 00073 00074 void 00075 InterfaceManager_impl::setTopo(const PaCO::PacoTopology_t& topo) 00076 { 00077 serveur_topo = topo; 00078 setTotalNode(topo.total); 00079 } 00080 00081 void 00082 InterfaceManager_impl::setWorkNode(CORBA::Short totalNode) 00083 { 00084 _workNode = totalNode; 00085 } 00086 00087 void 00088 InterfaceManager_impl::setNewNode(const char * node, const CORBA::Short rank) 00089 { 00090 00091 try 00092 { 00093 //CORBA::Object_var object = _orb->string_to_object(node); 00094 //PaCO::InterfaceParallel_var interface = PaCO::InterfaceParallel::_narrow(object); 00095 // CORBA::Short rank = interface->getDeployRank(); 00096 00097 if (_rtn == 1) 00098 cerr << "Return Proxy : Adding node : " << rank << endl; 00099 else 00100 cerr << "Adding node : " << rank << endl; 00101 // _informations_temp.nodes[_totalNodeReceived] = node; 00102 _informations_temp.nodes[rank] = node; 00103 _totalNodeReceived++; 00104 if (_totalNodeReceived == _totalNode) 00105 { 00106 if (_rtn == 1) 00107 cerr << "Return Proxy : All the node are registered" << endl; 00108 else 00109 cerr << "All the node are registered" << endl; 00110 00111 _nodes = _informations_temp.nodes; 00112 _informations.nodes = _nodes; 00113 _informations.serveur_topo = _informations_temp.serveur_topo; 00114 if (_rtn == 1) 00115 { 00116 // connection for the server side 00117 this->finish(); 00118 } 00119 _totalNodeReceived = 0; 00120 setConnectionInfos(); 00121 } 00122 } 00123 catch (const CORBA::Exception & e) 00124 { 00125 cerr << " Node : " << node << endl; 00126 cerr << " InterfaceManager_impl::setNewNode : Exception : " << e << endl; 00127 } 00128 00129 } 00130 00131 void 00132 InterfaceManager_impl::setReturn(const char * ref) 00133 { 00134 return_str = (char *) ref; 00135 // Now we have to give this reference to all the nodes 00136 CORBA::ULong i; 00137 CORBA::Object_var object; 00138 PaCO::InterfaceParallel_var interface; 00139 CORBA::ULong lgth; 00140 if (_workNode == -1) { lgth = _nodes.length();} 00141 else { lgth = _workNode;} 00142 for (i = 0; i<lgth; i++) 00143 { 00144 object = _orb->string_to_object(_nodes[i]); 00145 interface = PaCO::InterfaceParallel::_narrow(object); 00146 interface->refReturnObject(return_str.c_str()); 00147 } 00148 } 00149 00150 char * 00151 InterfaceManager_impl::getReturn() 00152 { 00153 // cerr << " Return string : " << return_str << endl; 00154 char * p = new char[return_str.length()+1]; 00155 return_str.copy(p,string::npos); 00156 p[return_str.length()]=0; 00157 // return (char *) return_str.c_str(); 00158 return p; 00159 } 00160 00161 // CORBA::Short 00162 // InterfaceManager_impl::getTotalNode() 00163 // { 00164 // return _totalNode; 00165 // } 00166 00167 void 00168 InterfaceManager_impl::return_ok(const CORBA::Short client_id) 00169 { 00170 CORBA::ULong i; 00171 CORBA::Object_var object; 00172 PaCO::InterfaceParallel_var interface; 00173 CORBA::ULong lgth; 00174 if (_workNode == -1) { lgth = _nodes.length();} 00175 else { lgth = _workNode;} 00176 cerr << "Adding a new client !!!!" << endl; 00177 // cerr << "WARNING : in this version PaCO++ only plays with the last client !" << endl; 00178 for (i = 0; i<lgth; i++) 00179 { 00180 //cerr << "Init work node : " << i << endl; 00181 object = _orb->string_to_object(_nodes[i]); 00182 interface = PaCO::InterfaceParallel::_narrow(object); 00183 interface->connect_return_object(client_id); 00184 } 00185 } 00186 00187 void 00188 InterfaceManager_impl::setConnectionInfos() 00189 { 00190 if (_rtn == 1) 00191 cerr << "Return Proxy : Deploy the parallel CORBA object" << endl; 00192 else 00193 cerr << "Deploy the parallel CORBA object" << endl; 00194 00195 // Initialize the nodes 00196 CORBA::Object_var object; 00197 PaCO::InterfaceParallel_var interface; 00198 if (_workNode == -1) 00199 { 00200 if (_rtn == 1) 00201 cerr << "Return Proxy : Deploying all the nodes" << endl; 00202 else 00203 cerr << "Deploying all the nodes" << endl; 00204 00205 for ( CORBA::ULong i = 0; i<_nodes.length(); i++) 00206 { 00207 if (_rtn == 1) 00208 cerr << "Return Proxy : Deploying node : " << i << endl; 00209 else 00210 cerr << "Deploying node : " << i << endl; 00211 00212 object = _orb->string_to_object(_nodes[i]); 00213 interface = PaCO::InterfaceParallel::_narrow(object); 00214 interface->init_InterfaceParallel(i,_nodes.length(), _informations.serveur_topo); 00215 00216 if (_rtn == 0) 00217 { 00218 cerr << "Exception handling part..." << endl; 00219 PaCO::InterfaceParallel::ref ref_nodes; 00220 ref_nodes.length(_nodes.length()); 00221 for (CORBA::ULong j = 0; j<_nodes.length(); j++) 00222 { 00223 ref_nodes[j] = _nodes[j]; 00224 } 00225 interface->setRef(ref_nodes); 00226 cerr << "Exception handling part...done" << endl; 00227 } 00228 } 00229 } 00230 else 00231 { 00232 cerr << "Deploying " << _workNode << " nodes" << endl; 00233 for ( CORBA::Long i = 0; i<_workNode; i++) 00234 { 00235 cerr << "Deploying node : " << i << endl; 00236 object = _orb->string_to_object(_nodes[i]); 00237 interface = PaCO::InterfaceParallel::_narrow(object); 00238 interface->init_InterfaceParallel(i,_nodes.length(), _informations.serveur_topo); 00239 } 00240 } 00241 if (_rtn == 0) 00242 { 00243 // connecting the client to the interface manager 00244 cerr << "Finishing...\n"; 00245 this->finish(); 00246 cerr << "Finishing...done\n"; 00247 } 00248 } 00249 00250 PaCO::ConnectionInfos* 00251 InterfaceManager_impl::getConnectionInfos() 00252 { 00253 // cerr << "In "<<__FUNCTION__ << ":"<<(void*)this<<endl; 00254 _current_uid++; 00255 _informations.request_uid = _current_uid; 00256 PaCO::ConnectionInfos* retour = new PaCO::ConnectionInfos(_informations); 00257 // cerr << "Out "<<__FUNCTION__ << ":"<<(void*)this<<endl; 00258 return retour; 00259 } 00260 00261 // class InterfaceParallel_impl 00262 00263 InterfaceParallel_impl::InterfaceParallel_impl(CORBA::ORB_ptr current_orb, char * ior) 00264 { 00265 _myRank = -1; 00266 _totalNode = -1; 00267 _orb = current_orb; 00268 _ior = ior; 00269 } 00270 00271 InterfaceParallel_impl::~InterfaceParallel_impl() 00272 { 00273 } 00274 00275 void 00276 InterfaceParallel_impl::deploy(CORBA::Short rank) 00277 { 00278 cerr << "Debut de deploy()"<< endl; 00279 _deployRank = rank; 00280 CORBA::Object_var object; 00281 object = _orb->string_to_object(_ior.c_str()); 00282 PaCO::InterfaceManager_var interface; 00283 interface = PaCO::InterfaceManager::_narrow(object); 00284 char * obj = _orb->object_to_string(_this()); 00285 try 00286 { 00287 interface->setNewNode(obj, rank); 00288 } 00289 catch (const CORBA::Exception & e) 00290 { 00291 cerr << "Argument : " << obj << endl; 00292 cerr << "InterfaceParallel_impl::deploy : Exception : " << e << endl; 00293 } 00294 cerr << "Fin de deploy()"<< endl; 00295 } 00296 00297 CORBA::Short 00298 InterfaceParallel_impl::getMyRank() 00299 { 00300 return _myRank; 00301 } 00302 00303 CORBA::Short 00304 InterfaceParallel_impl::getDeployRank() 00305 { 00306 return _deployRank; 00307 } 00308 00309 CORBA::Short 00310 InterfaceParallel_impl::getTotalNode() 00311 { 00312 return _totalNode; 00313 } 00314 00315 void 00316 InterfaceParallel_impl::init_InterfaceParallel(CORBA::Short myRank, CORBA::Short totalNode) 00317 { 00318 _myRank = myRank; 00319 _totalNode = totalNode; 00320 cerr << "Servant : " << myRank << " initialized" << endl; 00321 } 00322 00323 PaCO::ConnectionInfos* 00324 InterfaceParallel_impl::getConnectionInfos() 00325 { 00326 return NULL; 00327 } 00328 00329 void 00330 InterfaceParallel_impl::refReturnObject(const char * ref) 00331 { 00332 return_object = (char *) ref; 00333 } 00334 00335 void 00336 InterfaceParallel_impl::setRef(const PaCO::InterfaceParallel::ref & ref_nodes) 00337 { 00338 _ref_nodes.length(ref_nodes.length()); 00339 for (CORBA::ULong i = 0; i<ref_nodes.length(); i++) 00340 _ref_nodes[i] = ref_nodes[i]; 00341 }