PaCO++
0.05
|
00001 #include "PaCO++_operation.h" 00002 #include "PaCO++_src.h" 00003 00004 #include "com/paco_com.h" 00005 #include "thread/paco_thread.h" 00006 #include "DistributionLibrairie.h" 00007 #include "CommunicationScheduling.h" 00008 00009 #include <assert.h> 00010 00011 #undef DEBUG_INTERNAL 00012 00013 PaCO_operation::PaCO_operation(int _number_arguments) 00014 { 00015 // Default case : sequential client 00016 _current_id = -1; 00017 myRank = 0; 00018 mytopo.total = 1; 00019 _mode = PaCO::ServerSide; 00020 typeClient = false; 00021 termine = false; 00022 termine2 = false; 00023 client_id = -1; 00024 number_arguments = _number_arguments; 00025 libraries_in = new DistributionLibrary*[number_arguments]; 00026 libraries_out = new DistributionLibrary*[number_arguments]; 00027 #ifdef PACO_WARN_DISTLIB 00028 libraries_in_name.resize(number_arguments); 00029 libraries_out_name.resize(number_arguments); 00030 #endif 00031 for(int i=0; i<number_arguments; i++) 00032 { 00033 libraries_in[i] = NULL; 00034 libraries_out[i] = NULL; 00035 #ifdef PACO_WARN_DISTLIB 00036 libraries_in_name[i]=NULL; 00037 libraries_out_name[i]=NULL; 00038 #endif 00039 } 00040 FabManager=paco_getFabriqueManager(); 00041 FabThread=0; 00042 FabCom=0; 00043 FabComScheduling=0; 00044 my_comScheduling=0; 00045 return_op = 0; 00046 00047 // Exception handling 00048 _exception = false; 00049 _level = 0; 00050 00051 // Timing 00052 #ifdef PADICO 00053 _ticks=(padico_timing_t*)malloc(sizeof(padico_timing_t)*10); 00054 #endif 00055 00056 } 00057 00058 PaCO_operation::~PaCO_operation() 00059 { 00060 #ifdef PACO_WARN_DISTLIB 00061 for(int i=0; i<number_arguments; i++) 00062 { 00063 if (libraries_in_name[i]) 00064 free(libraries_in_name[i]); 00065 if (libraries_out_name[i]) 00066 free(libraries_out_name[i]); 00067 } 00068 #endif 00069 delete libraries_in; 00070 delete libraries_out; 00071 delete my_mutex; 00072 delete my_condition; 00073 delete my_mutex2; 00074 delete my_condition2; 00075 delete my_mutex3; 00076 // Memories must be deleted 00077 } 00078 00079 void 00080 PaCO_operation::setClientId(CORBA::Short request_uid) 00081 { 00082 client_id = request_uid; 00083 } 00084 00085 void 00086 PaCO_operation::init_context(PaCO_operation * op) 00087 { 00088 myRank = op->myRank ; 00089 mytopo = op->mytopo ; 00090 number_stubs = op->number_stubs ; 00091 00092 FabManager = op->FabManager; 00093 00094 group = op->group; 00095 my_com = op->my_com ; 00096 FabCom = op->FabCom ; 00097 00098 my_comScheduling = op->my_comScheduling ; 00099 FabComScheduling = op->FabComScheduling ; 00100 00101 // Client part 00102 typeClient = op->typeClient ; 00103 if (op->serveur_topo_aller.total > 0) 00104 { 00105 setServerTopo(op->serveur_topo_aller,"in"); 00106 } 00107 if (op->serveur_topo_retour.total > 0) 00108 { 00109 setServerTopo(op->serveur_topo_retour,"out"); 00110 } 00111 00112 // Server part 00113 FabThread = op->FabThread ; 00114 my_mutex = op->my_mutex ; 00115 my_condition = op->my_condition ; 00116 my_mutex2 = op->my_mutex2 ; 00117 my_condition2 = op->my_condition2 ; 00118 my_mutex3 = op->my_mutex3 ; 00119 } 00120 00121 void 00122 PaCO_operation::init_return_context(PaCO_operation * op) 00123 { 00124 myRank = op->myRank ; 00125 mytopo = op->mytopo ; 00126 number_stubs = op->number_stubs ; 00127 00128 FabManager = op->FabManager; 00129 00130 group = op->group ; 00131 my_com = op->my_com ; 00132 FabCom = op->FabCom ; 00133 00134 my_comScheduling = op->my_comScheduling ; 00135 FabComScheduling = op->FabComScheduling ; 00136 00137 // Client part 00138 typeClient = op->typeClient ; 00139 if (op->serveur_topo_aller.total > 0) 00140 { 00141 setServerTopo(op->serveur_topo_aller,"in"); 00142 } 00143 // Server part 00144 FabThread = op->FabThread ; 00145 my_mutex = op->my_mutex ; 00146 my_condition = op->my_condition ; 00147 my_mutex2 = op->my_mutex2 ; 00148 my_condition2 = op->my_condition2 ; 00149 my_mutex3 = op->my_mutex3 ; 00150 } 00151 00152 void 00153 PaCO_operation::init_context_proxy(PaCO_operation * op) 00154 { 00155 number_arguments = op->number_arguments; 00156 libraries_in = op->libraries_in; 00157 libraries_out = op->libraries_out; 00158 00159 for(int i=0; i<number_arguments; i++) 00160 { 00161 if (libraries_in[i]) 00162 { 00163 libraries_in[i]->setCommunicator(group); 00164 } 00165 if (libraries_out[i]) 00166 { 00167 libraries_out[i]->setCommunicator(group); 00168 } 00169 } 00170 } 00171 00172 void 00173 PaCO_operation::init_context_args(PaCO_operation * op) 00174 { 00175 return_op = 1; 00176 number_arguments = op->number_arguments; 00177 _mode = op->_mode; 00178 libraries_in = op->libraries_out; 00179 libraries_out = op->libraries_in; 00180 00181 for(int i=0; i<number_arguments; i++) 00182 { 00183 if (libraries_in[i]) 00184 { 00185 libraries_in[i]->setCommunicator(group); 00186 } 00187 if (libraries_out[i]) 00188 { 00189 libraries_out[i]->setCommunicator(group); 00190 } 00191 } 00192 } 00193 00194 void 00195 PaCO_operation::setFabManager(paco_fabrique_manager* fm) 00196 { 00197 FabManager = fm; 00198 } 00199 00200 void 00201 PaCO_operation::setComFab(paco_fabrique_com* fc) 00202 { 00203 FabCom = fc; 00204 } 00205 00206 void 00207 PaCO_operation::setLibCom(const string& LibCom, void * _group) 00208 { 00209 if (FabCom==0) 00210 { 00211 if (FabManager) 00212 FabCom = FabManager->get_com(LibCom); 00213 else 00214 { 00215 cerr << "Error: Unable to instantiate lib com "<<LibCom<<endl; 00216 abort(); 00217 } 00218 } 00219 00220 group = _group; 00221 my_com = FabCom->paco_create(group); 00222 00223 for(int i=0; i<number_arguments; i++) 00224 { 00225 if (libraries_in[i]) 00226 { 00227 libraries_in[i]->setCommunicator(group); 00228 } 00229 if (libraries_out[i]) 00230 { 00231 libraries_out[i]->setCommunicator(group); 00232 } 00233 } 00234 } 00235 00236 // void 00237 // PaCO_operation::setMode(PaCO::distLoc_t mode, int param, const string& way) throw(BadWayString) 00238 // { 00239 // //_mode = mode; 00240 // if (way == "inout") 00241 // { 00242 // libraries_in[param]->setMode(mode); 00243 // libraries_out[param]->setMode(mode); 00244 // } 00245 // else if (way == "in") 00246 // { 00247 // libraries_in[param]->setMode(mode); 00248 // } 00249 // else if (way == "out") 00250 // { 00251 // libraries_out[param]->setMode(mode); 00252 // } 00253 // else 00254 // { 00255 // // Bad way string 00256 // throw BadWayString(way); 00257 // } 00258 // } 00259 00260 void 00261 PaCO_operation::setDisLibArg(int arg_number, const string& lib_name, const string& way) throw(BadWayString) 00262 { 00263 if (way == "inout") 00264 { 00265 libraries_in [arg_number] = FabManager->get_distribution(lib_name)->create(); 00266 libraries_out[arg_number] = FabManager->get_distribution(lib_name)->create(); 00267 #ifdef PACO_WARN_DISTLIB 00268 if (libraries_in_name[arg_number]) free(libraries_in_name[arg_number]); 00269 if (libraries_out_name[arg_number]) free(libraries_out_name[arg_number]); 00270 libraries_in_name [arg_number] = strdup(lib_name.c_str()); 00271 libraries_out_name [arg_number] = strdup(lib_name.c_str()); 00272 #endif 00273 } 00274 else if (way == "in") 00275 { 00276 libraries_in[arg_number] = FabManager->get_distribution(lib_name)->create(); 00277 #ifdef PACO_WARN_DISTLIB 00278 if (libraries_in_name[arg_number]) free(libraries_in_name[arg_number]); 00279 libraries_in_name [arg_number] = strdup(lib_name.c_str()); 00280 #endif 00281 } 00282 else if (way == "out") 00283 { 00284 libraries_out[arg_number] = FabManager->get_distribution(lib_name)->create(); 00285 #ifdef PACO_WARN_DISTLIB 00286 if (libraries_out_name[arg_number]) free(libraries_out_name[arg_number]); 00287 libraries_out_name [arg_number] = strdup(lib_name.c_str()); 00288 #endif 00289 } 00290 else 00291 { 00292 // Bad way string 00293 throw BadWayString(way); 00294 } 00295 } 00296 // void 00297 // PaCO_operation::setTypeArg(int arg_number, Pattron * temp, const string& lib_name, const string& way) throw(BadWayString) 00298 // { 00299 // Fabrique * f = new Fabrique(); 00300 // f->enregistrer(temp); 00301 // if (way == "inout") 00302 // { 00303 // libraries_in[arg_number] = FabDist->paco_create(lib_name,f); 00304 // libraries_out[arg_number] = FabDist->paco_create(lib_name,f); 00305 // } 00306 // else if (way == "in") 00307 // { 00308 // libraries_in[arg_number] = FabDist->paco_create(lib_name,f); 00309 // } 00310 // else if (way == "out") 00311 // { 00312 // libraries_out[arg_number] = FabDist->paco_create(lib_name,f); 00313 // } 00314 // else 00315 // { 00316 // // Bad way string 00317 // throw BadWayString(way); 00318 // } 00319 // } 00320 00321 // void 00322 // PaCO_operation::initArg(PaCO::PacoInitData_t data, int nb_param, const string& way) throw(BadWayString) 00323 // { 00324 // if (way == "in") 00325 // { 00326 // libraries_in[nb_param]->setGlobalDataConfiguration(data.gd); 00327 // libraries_in[nb_param]->setLocalDataConfiguration (data.ld); 00328 // } 00329 // else if (way == "out") 00330 // { 00331 // libraries_out[nb_param]->setGlobalDataConfiguration(data.gd); 00332 // libraries_out[nb_param]->setLocalDataConfiguration (data.ld); 00333 // } 00334 // else 00335 // { 00336 // // Bad way string 00337 // throw BadWayString(way); 00338 // } 00339 // } 00340 00341 void 00342 PaCO_operation::init(int rank, int total) 00343 { 00344 myRank = rank; 00345 mytopo.total = total; 00346 } 00347 00348 // int 00349 // PaCO_operation::setDistribConfig(void * config, int number, const string& way) throw(BadWayString, BadWaySetStringDistribConfig) 00350 // { 00351 // if (way == "inout") 00352 // { 00353 // int ret_in = libraries_in[number]->setConfig(config); 00354 // int ret_out = libraries_out[number]->setConfig(config); 00355 // if (ret_in == ret_out) 00356 // { 00357 // return ret_in; 00358 // } 00359 // else 00360 // { 00361 // throw BadWaySetStringDistribConfig(way, ret_in, ret_out); 00362 // } 00363 // } 00364 // else if (way == "in") 00365 // { 00366 // return libraries_in[number]->setConfig(config); 00367 // } 00368 // else if (way == "out") 00369 // { 00370 // return libraries_out[number]->setConfig(config); 00371 // } 00372 // else 00373 // { 00374 // // Bad way string 00375 // throw BadWayString(way); 00376 // } 00377 // } 00378 00379 // void * 00380 // PaCO_operation::getDistribConfig(int number, const string& way) throw(BadWayString) 00381 // { 00382 // if (way == "in") 00383 // { 00384 // return libraries_in[number]->getConfig(); 00385 // } 00386 // else if (way == "out") 00387 // { 00388 // return libraries_out[number]->getConfig(); 00389 // } 00390 // else 00391 // { 00392 // // Bad way string 00393 // throw BadWayString(way); 00394 // } 00395 // } 00396 00397 void 00398 PaCO_operation::setTypeClient(bool type) 00399 { 00400 typeClient = type; 00401 } 00402 00403 00404 void 00405 PaCO_operation::setClientTopo(PaCO::PacoTopology_t _client_topo, const string& way) throw(BadWayString) 00406 { 00407 if (way == "in") 00408 { 00409 serveur_topo_retour = _client_topo; 00410 } 00411 else if (way == "out") 00412 { 00413 serveur_topo_aller = _client_topo; 00414 } 00415 else 00416 { 00417 // Bad way string 00418 throw BadWayString(way); 00419 } 00420 } 00421 00422 void 00423 PaCO_operation::setClientTopo(PaCO::PacoTopology_t _client_topo) 00424 { 00425 serveur_topo_retour = _client_topo; 00426 } 00427 00428 void 00429 PaCO_operation::setServerTopo(PaCO::PacoTopology_t _serveur_topo, const string& way) throw(BadWayString) 00430 { 00431 if (way == "in") 00432 { 00433 serveur_topo_aller = _serveur_topo; 00434 } 00435 else if (way == "out") 00436 { 00437 serveur_topo_retour = _serveur_topo; 00438 } 00439 else 00440 { 00441 // Bad way string 00442 throw BadWayString(way); 00443 } 00444 } 00445 00446 void 00447 PaCO_operation::configureTopo() 00448 { 00449 // DO NOT FORGET TO CLEAR MEMORIES (WHAT ABOUT COMMUNICATION MATRICES??) !!!! 00450 clearAllScheduleMemory(); 00451 00452 if (return_op == 0) 00453 { 00454 for (int i = 0; i < number_arguments; i++) 00455 { 00456 if (libraries_in[i] != NULL) 00457 { 00458 libraries_in[i]->setDestTopology(serveur_topo_aller); 00459 libraries_in[i]->setSourceTopology(serveur_topo_retour); 00460 libraries_in[i]->setNodeRank(myRank); 00461 } 00462 if (libraries_out[i] != NULL) 00463 { 00464 libraries_out[i]->setDestTopology(serveur_topo_retour); 00465 libraries_out[i]->setSourceTopology(serveur_topo_aller); 00466 libraries_out[i]->setNodeRank(myRank); 00467 } 00468 } 00469 } 00470 } 00471 00472 void 00473 PaCO_operation::setThreadFab(paco_fabrique_thread* ft) 00474 { 00475 FabThread = ft; 00476 } 00477 00478 void 00479 PaCO_operation::setLibThread(const string& LibThread) 00480 { 00481 if (FabThread==0) 00482 { 00483 if (FabManager) 00484 FabThread = FabManager->get_thread(LibThread); 00485 else 00486 { 00487 cerr << "Error: Unable to instantiate lib thread "<<LibThread<<endl; 00488 abort(); 00489 } 00490 00491 } 00492 00493 my_mutex = FabThread->paco_create_mutex(); 00494 my_condition = FabThread->paco_create_condition(my_mutex); 00495 my_mutex2 = FabThread->paco_create_mutex(); 00496 my_condition2 = FabThread->paco_create_condition(my_mutex2); 00497 my_mutex3 = FabThread->paco_create_mutex(); 00498 } 00499 00500 void 00501 PaCO_operation::setComSchedulingFab(paco_fabrique_comScheduling* fcs) 00502 { 00503 FabComScheduling = fcs; 00504 } 00505 00506 void 00507 PaCO_operation::setLibComScheduling(const string& LibComScheduling) 00508 { 00509 if (FabComScheduling==0) 00510 { 00511 if (FabManager) 00512 FabComScheduling = FabManager->get_comScheduling(LibComScheduling); 00513 else 00514 { 00515 cerr << "Error: Unable to instantiate lib communication schedule "<<LibComScheduling<<endl; 00516 abort(); 00517 } 00518 00519 } 00520 00521 if (my_comScheduling) 00522 delete my_comScheduling; 00523 my_comScheduling = FabComScheduling->create_comScheduling(); 00524 } 00525 00526 PaCO_operation::_com_info_t* 00527 PaCO_operation::getComMemoryId(long id) 00528 { 00529 // lookup existing communication info 00530 _com_info_map_t::iterator it2 = _com_info_map.find(id); 00531 if (it2 != _com_info_map.end()) 00532 { 00533 #ifdef DEBUG_INTERNAL 00534 std::cerr <<"[INFO] found communication info for Id "<<id<<endl; 00535 #endif 00536 return _com_info_map[id]; 00537 } 00538 else 00539 { 00540 return NULL; 00541 } 00542 } 00543 00544 void 00545 PaCO_operation::noCommunicationMemory() 00546 { 00547 _current_id = -1; // _DO_NOT_USE_COMMUNICATION_MEMORY; 00548 00549 #ifdef DEBUG_INTERNAL 00550 std::cerr <<"[INFO] not using memory"<<endl; 00551 #endif 00552 00553 } 00554 00555 void 00556 PaCO_operation::useCommunicationMemoryId(long id) throw (InvalidArgument) 00557 { 00558 if (id<0) 00559 throw new InvalidArgument("useCommunicationMemeoryId expect an id >0"); 00560 00561 #ifdef DEBUG_INTERNAL 00562 std::cerr <<"[INFO] using memory Id "<<id<<endl; 00563 #endif 00564 00565 // set current id 00566 _current_id = id; 00567 00568 // Beta patch 00569 for(int i=0; i<number_arguments; i++) 00570 { 00571 if (libraries_in[i]) 00572 { 00573 bool res=libraries_in[i]->setComId(id); 00574 if (!res) { 00575 #ifdef PACO_WARN_DISTLIB 00576 std::cerr << "warning: argument #"<<i<<" with dist lib "<<libraries_in_name[i]<<" does not support comm cache\n"; 00577 #else 00578 std::cerr << "warning: argument #"<<i<<" does not support comm cache\n"; 00579 #endif 00580 } 00581 } 00582 } 00583 } 00584 00585 void 00586 PaCO_operation::clearCommunicationMemoryId(long id) throw (InvalidArgument) 00587 { 00588 if (id<0) 00589 throw new InvalidArgument("clearCommunicationMemeoryId expects an id >0"); 00590 00591 #ifdef DEBUG_INTERNAL 00592 std::cerr <<"[INFO] clearing communication memory Id "<<id<<endl; 00593 #endif 00594 00595 _com_info_map_t::iterator it2 = _com_info_map.find(id); 00596 if (it2 != _com_info_map.end()) 00597 { 00598 delete _com_info_map[id]; 00599 _com_info_map.erase(it2); 00600 } 00601 00602 assert(my_comScheduling!=0); 00603 my_comScheduling->clearScheduleId(id); 00604 } 00605 00606 void 00607 PaCO_operation::clearScheduleMemoryId(long id) throw (InvalidArgument) 00608 { 00609 assert(my_comScheduling!=0); 00610 my_comScheduling->clearScheduleId(id); 00611 } 00612 00613 void 00614 PaCO_operation::clearAllScheduleMemory() 00615 { 00616 if (my_comScheduling) 00617 my_comScheduling->clearAllSchedules(); 00618 }