PaCO++  0.05
PaCO++_operation.cc
Go to the documentation of this file.
00001 #include "PaCO++_operation.h"
00002 #include "PaCO++_src.h"
00003 
00004 #include "com/paco_com.h"
00005 #include "thread/paco_thread.h"
00006 #include "DistributionLibrairie.h"
00007 #include "CommunicationScheduling.h"
00008 
00009 #include <assert.h>
00010 
00011 #undef DEBUG_INTERNAL
00012 
00013 PaCO_operation::PaCO_operation(int _number_arguments)
00014 {
00015   // Default case : sequential client
00016   _current_id = -1;
00017   myRank = 0;
00018   mytopo.total = 1;
00019   _mode = PaCO::ServerSide;
00020   typeClient = false;
00021   termine = false;
00022   termine2 = false;
00023   client_id = -1;
00024   number_arguments = _number_arguments;
00025   libraries_in = new DistributionLibrary*[number_arguments];
00026   libraries_out = new DistributionLibrary*[number_arguments];
00027 #ifdef PACO_WARN_DISTLIB
00028   libraries_in_name.resize(number_arguments);
00029   libraries_out_name.resize(number_arguments);
00030 #endif
00031   for(int i=0; i<number_arguments; i++)
00032     {
00033       libraries_in[i] = NULL;
00034       libraries_out[i] = NULL;
00035 #ifdef PACO_WARN_DISTLIB
00036       libraries_in_name[i]=NULL;
00037       libraries_out_name[i]=NULL;
00038 #endif
00039     }
00040   FabManager=paco_getFabriqueManager();
00041   FabThread=0;
00042   FabCom=0;
00043   FabComScheduling=0;
00044   my_comScheduling=0;
00045   return_op = 0;
00046 
00047   // Exception handling
00048   _exception = false;
00049   _level = 0;
00050 
00051   // Timing
00052 #ifdef PADICO
00053   _ticks=(padico_timing_t*)malloc(sizeof(padico_timing_t)*10);
00054 #endif
00055 
00056 }
00057 
00058 PaCO_operation::~PaCO_operation()
00059 {
00060 #ifdef PACO_WARN_DISTLIB
00061   for(int i=0; i<number_arguments; i++)
00062     {
00063       if (libraries_in_name[i])
00064    free(libraries_in_name[i]);
00065       if (libraries_out_name[i])
00066    free(libraries_out_name[i]);
00067     }
00068 #endif
00069   delete libraries_in;
00070   delete libraries_out;
00071   delete my_mutex;
00072   delete my_condition;
00073   delete my_mutex2;
00074   delete my_condition2;
00075   delete my_mutex3;
00076   // Memories must be deleted
00077 }
00078 
00079 void 
00080 PaCO_operation::setClientId(CORBA::Short request_uid)
00081 {
00082   client_id = request_uid;
00083 }
00084 
00085 void 
00086 PaCO_operation::init_context(PaCO_operation * op)
00087 {
00088   myRank = op->myRank ;
00089   mytopo = op->mytopo ;
00090   number_stubs = op->number_stubs ;
00091 
00092   FabManager = op->FabManager;
00093  
00094   group = op->group;
00095   my_com = op->my_com ;
00096   FabCom = op->FabCom ;
00097 
00098   my_comScheduling = op->my_comScheduling ;
00099   FabComScheduling = op->FabComScheduling ;
00100 
00101   // Client part
00102   typeClient = op->typeClient ; 
00103   if (op->serveur_topo_aller.total > 0)
00104     {
00105       setServerTopo(op->serveur_topo_aller,"in");
00106     }
00107   if (op->serveur_topo_retour.total > 0)
00108     {
00109       setServerTopo(op->serveur_topo_retour,"out");
00110     }
00111 
00112   // Server part
00113   FabThread = op->FabThread ;
00114   my_mutex = op->my_mutex ;
00115   my_condition = op->my_condition ;
00116   my_mutex2 = op->my_mutex2 ;
00117   my_condition2 = op->my_condition2 ;
00118   my_mutex3 = op->my_mutex3 ;
00119 }
00120 
00121 void 
00122 PaCO_operation::init_return_context(PaCO_operation * op)
00123 {
00124   myRank = op->myRank ;
00125   mytopo = op->mytopo ;
00126   number_stubs = op->number_stubs ;
00127 
00128   FabManager = op->FabManager;
00129 
00130   group = op->group ;
00131   my_com = op->my_com ;
00132   FabCom = op->FabCom ;
00133 
00134   my_comScheduling = op->my_comScheduling ;
00135   FabComScheduling = op->FabComScheduling ;
00136 
00137   // Client part
00138   typeClient = op->typeClient ; 
00139   if (op->serveur_topo_aller.total > 0)
00140     {
00141       setServerTopo(op->serveur_topo_aller,"in");
00142     }
00143   // Server part
00144   FabThread = op->FabThread ;
00145   my_mutex = op->my_mutex ;
00146   my_condition = op->my_condition ;
00147   my_mutex2 = op->my_mutex2 ;
00148   my_condition2 = op->my_condition2 ;
00149   my_mutex3 = op->my_mutex3 ;
00150 }
00151 
00152 void 
00153 PaCO_operation::init_context_proxy(PaCO_operation * op)
00154 {
00155   number_arguments = op->number_arguments;
00156   libraries_in = op->libraries_in;
00157   libraries_out = op->libraries_out;
00158 
00159   for(int i=0; i<number_arguments; i++)
00160     {
00161       if (libraries_in[i])
00162         {
00163      libraries_in[i]->setCommunicator(group);
00164         }
00165       if (libraries_out[i])
00166         {
00167      libraries_out[i]->setCommunicator(group);
00168         }
00169     }
00170 }
00171 
00172 void 
00173 PaCO_operation::init_context_args(PaCO_operation * op)
00174 {
00175   return_op = 1;
00176   number_arguments = op->number_arguments;
00177   _mode = op->_mode;
00178   libraries_in = op->libraries_out;
00179   libraries_out = op->libraries_in;
00180   
00181   for(int i=0; i<number_arguments; i++)
00182     {
00183       if (libraries_in[i])
00184         {
00185      libraries_in[i]->setCommunicator(group);
00186         }
00187       if (libraries_out[i])
00188         {
00189      libraries_out[i]->setCommunicator(group);
00190         }
00191     }
00192 }
00193 
00194 void
00195 PaCO_operation::setFabManager(paco_fabrique_manager* fm)
00196 {
00197     FabManager = fm;
00198 }
00199 
00200 void
00201 PaCO_operation::setComFab(paco_fabrique_com* fc)
00202 {
00203     FabCom = fc;
00204 }
00205 
00206 void
00207 PaCO_operation::setLibCom(const string& LibCom, void * _group)
00208 {
00209   if (FabCom==0)
00210     {
00211       if (FabManager)
00212    FabCom = FabManager->get_com(LibCom);
00213       else
00214    {
00215      cerr << "Error: Unable to instantiate lib com "<<LibCom<<endl;
00216      abort();
00217    }      
00218     }
00219 
00220     group = _group;
00221     my_com = FabCom->paco_create(group);
00222 
00223     for(int i=0; i<number_arguments; i++)
00224       {
00225         if (libraries_in[i])
00226         {
00227             libraries_in[i]->setCommunicator(group);
00228         }
00229         if (libraries_out[i])
00230         {
00231             libraries_out[i]->setCommunicator(group);
00232         }
00233       }
00234 }
00235 
00236 // void
00237 // PaCO_operation::setMode(PaCO::distLoc_t mode, int param, const string& way) throw(BadWayString)
00238 // {
00239 //   //_mode = mode;
00240 //   if (way == "inout")
00241 //     {
00242 //       libraries_in[param]->setMode(mode);
00243 //       libraries_out[param]->setMode(mode);
00244 //     }
00245 //   else if (way == "in")
00246 //     {
00247 //       libraries_in[param]->setMode(mode);
00248 //     }
00249 //   else if (way == "out")
00250 //     {
00251 //       libraries_out[param]->setMode(mode);
00252 //     }
00253 //   else
00254 //     {
00255 //       // Bad way string
00256 //       throw BadWayString(way);
00257 //     }
00258 // }
00259 
00260 void
00261 PaCO_operation::setDisLibArg(int arg_number, const string& lib_name, const string& way) throw(BadWayString)
00262 {
00263   if (way == "inout")
00264     {
00265       libraries_in [arg_number] = FabManager->get_distribution(lib_name)->create();
00266       libraries_out[arg_number] = FabManager->get_distribution(lib_name)->create();
00267 #ifdef PACO_WARN_DISTLIB
00268       if (libraries_in_name[arg_number]) free(libraries_in_name[arg_number]);
00269       if (libraries_out_name[arg_number]) free(libraries_out_name[arg_number]);
00270       libraries_in_name [arg_number]  = strdup(lib_name.c_str());
00271       libraries_out_name [arg_number] = strdup(lib_name.c_str());
00272 #endif
00273     }
00274   else if (way == "in")
00275     {
00276       libraries_in[arg_number] = FabManager->get_distribution(lib_name)->create();
00277 #ifdef PACO_WARN_DISTLIB
00278       if (libraries_in_name[arg_number]) free(libraries_in_name[arg_number]);
00279       libraries_in_name [arg_number]  = strdup(lib_name.c_str());
00280 #endif
00281     }
00282   else if (way == "out")
00283     {
00284       libraries_out[arg_number] = FabManager->get_distribution(lib_name)->create();
00285 #ifdef PACO_WARN_DISTLIB
00286       if (libraries_out_name[arg_number]) free(libraries_out_name[arg_number]);
00287       libraries_out_name [arg_number] = strdup(lib_name.c_str());
00288 #endif
00289     }
00290   else
00291     {
00292       // Bad way string
00293       throw BadWayString(way);
00294     }
00295 }
00296 // void
00297 // PaCO_operation::setTypeArg(int arg_number, Pattron * temp, const string& lib_name, const string& way) throw(BadWayString)
00298 // {
00299 //   Fabrique * f = new Fabrique();
00300 //   f->enregistrer(temp);
00301 //   if (way == "inout")
00302 //     {
00303 //       libraries_in[arg_number] = FabDist->paco_create(lib_name,f);
00304 //       libraries_out[arg_number] = FabDist->paco_create(lib_name,f);
00305 //     }
00306 //   else if (way == "in")
00307 //     {
00308 //       libraries_in[arg_number] = FabDist->paco_create(lib_name,f);
00309 //     }
00310 //   else if (way == "out")
00311 //     {
00312 //       libraries_out[arg_number] = FabDist->paco_create(lib_name,f);
00313 //     }
00314 //   else
00315 //     {
00316 //       // Bad way string
00317 //       throw BadWayString(way);
00318 //     }
00319 // }
00320 
00321 // void
00322 // PaCO_operation::initArg(PaCO::PacoInitData_t data, int nb_param, const string& way) throw(BadWayString)
00323 // {
00324 //    if (way == "in")
00325 //     {
00326 //       libraries_in[nb_param]->setGlobalDataConfiguration(data.gd);
00327 //       libraries_in[nb_param]->setLocalDataConfiguration (data.ld);
00328 //     }
00329 //    else if (way == "out")
00330 //     {
00331 //       libraries_out[nb_param]->setGlobalDataConfiguration(data.gd);
00332 //       libraries_out[nb_param]->setLocalDataConfiguration (data.ld);
00333 //     }
00334 //    else
00335 //     {
00336 //       // Bad way string
00337 //       throw BadWayString(way);
00338 //     }
00339 // }
00340 
00341 void
00342 PaCO_operation::init(int rank, int total)
00343 {
00344     myRank = rank;
00345     mytopo.total = total;
00346 }
00347 
00348 // int 
00349 // PaCO_operation::setDistribConfig(void * config, int number, const string& way) throw(BadWayString, BadWaySetStringDistribConfig)
00350 // {
00351 //   if (way == "inout")
00352 //     {
00353 //       int ret_in = libraries_in[number]->setConfig(config);
00354 //       int ret_out = libraries_out[number]->setConfig(config);
00355 //       if (ret_in == ret_out)
00356 //    {
00357 //      return ret_in;
00358 //    }
00359 //       else
00360 //    {
00361 //      throw BadWaySetStringDistribConfig(way, ret_in, ret_out);
00362 //    }
00363 //     }
00364 //   else if (way == "in")
00365 //     {
00366 //       return libraries_in[number]->setConfig(config);
00367 //     }
00368 //   else if (way == "out")
00369 //     {
00370 //       return libraries_out[number]->setConfig(config);
00371 //     }
00372 //   else
00373 //     {
00374 //       // Bad way string
00375 //       throw BadWayString(way);
00376 //     }
00377 // }
00378 
00379 // void * 
00380 // PaCO_operation::getDistribConfig(int number, const string& way)  throw(BadWayString)
00381 // {
00382 //   if (way == "in")
00383 //     {
00384 //       return libraries_in[number]->getConfig();
00385 //     }
00386 //   else if (way == "out")
00387 //     {
00388 //       return libraries_out[number]->getConfig();
00389 //     }
00390 //   else
00391 //     {
00392 //       // Bad way string
00393 //       throw BadWayString(way);
00394 //     }
00395 // }
00396 
00397 void
00398 PaCO_operation::setTypeClient(bool type)
00399 {
00400     typeClient = type;
00401 }
00402 
00403 
00404 void 
00405 PaCO_operation::setClientTopo(PaCO::PacoTopology_t _client_topo, const string& way) throw(BadWayString) 
00406 {
00407    if (way == "in")
00408      {
00409        serveur_topo_retour = _client_topo;
00410      }
00411    else if (way == "out")
00412      {
00413        serveur_topo_aller = _client_topo;
00414      }
00415    else
00416      {
00417        // Bad way string
00418        throw BadWayString(way);
00419      }
00420 }
00421 
00422 void 
00423 PaCO_operation::setClientTopo(PaCO::PacoTopology_t _client_topo)
00424 {
00425   serveur_topo_retour =  _client_topo;
00426 }
00427 
00428 void 
00429 PaCO_operation::setServerTopo(PaCO::PacoTopology_t _serveur_topo, const string& way) throw(BadWayString) 
00430 {
00431    if (way == "in")
00432      {
00433        serveur_topo_aller = _serveur_topo;
00434      }
00435    else if (way == "out")
00436      {
00437        serveur_topo_retour = _serveur_topo;
00438      }
00439    else
00440      {
00441        // Bad way string
00442        throw BadWayString(way);
00443      }
00444 }
00445 
00446 void 
00447 PaCO_operation::configureTopo()
00448   {
00449     // DO NOT FORGET TO CLEAR MEMORIES (WHAT ABOUT COMMUNICATION MATRICES??) !!!!
00450     clearAllScheduleMemory();
00451     
00452     if (return_op == 0)
00453       {  
00454    for (int i = 0; i < number_arguments; i++)
00455      {
00456        if (libraries_in[i] != NULL)
00457          {
00458       libraries_in[i]->setDestTopology(serveur_topo_aller);
00459       libraries_in[i]->setSourceTopology(serveur_topo_retour);
00460       libraries_in[i]->setNodeRank(myRank);
00461          }
00462        if (libraries_out[i] != NULL)
00463          {
00464       libraries_out[i]->setDestTopology(serveur_topo_retour);
00465       libraries_out[i]->setSourceTopology(serveur_topo_aller);
00466       libraries_out[i]->setNodeRank(myRank);
00467          }
00468      }
00469       }
00470   }
00471 
00472 void
00473 PaCO_operation::setThreadFab(paco_fabrique_thread* ft)
00474 {
00475     FabThread = ft;
00476 }
00477 
00478 void
00479 PaCO_operation::setLibThread(const string& LibThread)
00480 {
00481   if (FabThread==0)
00482     {
00483       if (FabManager)
00484    FabThread = FabManager->get_thread(LibThread);
00485       else
00486    {
00487      cerr << "Error: Unable to instantiate lib thread "<<LibThread<<endl;
00488      abort();
00489    }
00490    
00491     }
00492     
00493   my_mutex = FabThread->paco_create_mutex();
00494   my_condition = FabThread->paco_create_condition(my_mutex);
00495   my_mutex2 = FabThread->paco_create_mutex();
00496   my_condition2 = FabThread->paco_create_condition(my_mutex2);
00497   my_mutex3 = FabThread->paco_create_mutex();
00498 }
00499 
00500 void
00501 PaCO_operation::setComSchedulingFab(paco_fabrique_comScheduling* fcs)
00502 {
00503     FabComScheduling = fcs;
00504 }
00505 
00506 void
00507 PaCO_operation::setLibComScheduling(const string& LibComScheduling)
00508 {
00509   if (FabComScheduling==0)
00510     {
00511       if (FabManager)
00512    FabComScheduling = FabManager->get_comScheduling(LibComScheduling);
00513       else
00514    {
00515      cerr << "Error: Unable to instantiate lib communication schedule "<<LibComScheduling<<endl;
00516      abort();
00517    }
00518    
00519     }
00520     
00521   if (my_comScheduling)
00522     delete my_comScheduling;
00523   my_comScheduling = FabComScheduling->create_comScheduling();
00524 }
00525 
00526 PaCO_operation::_com_info_t*
00527 PaCO_operation::getComMemoryId(long id)
00528 {
00529   // lookup existing communication info
00530   _com_info_map_t::iterator it2 = _com_info_map.find(id);
00531   if (it2 != _com_info_map.end()) 
00532     {
00533 #ifdef DEBUG_INTERNAL
00534       std::cerr <<"[INFO] found communication info for Id "<<id<<endl;
00535 #endif
00536       return  _com_info_map[id];
00537     }
00538   else
00539     {
00540       return NULL;
00541     }
00542 }
00543 
00544 void
00545 PaCO_operation::noCommunicationMemory()
00546 {
00547   _current_id = -1; // _DO_NOT_USE_COMMUNICATION_MEMORY;
00548 
00549 #ifdef DEBUG_INTERNAL
00550   std::cerr <<"[INFO] not using memory"<<endl;
00551 #endif
00552 
00553 }
00554 
00555 void
00556 PaCO_operation::useCommunicationMemoryId(long id) throw (InvalidArgument)
00557 {
00558   if (id<0)
00559     throw new InvalidArgument("useCommunicationMemeoryId expect an id >0");
00560 
00561 #ifdef DEBUG_INTERNAL
00562   std::cerr <<"[INFO] using memory Id "<<id<<endl;
00563 #endif
00564 
00565   // set current id
00566   _current_id = id;
00567 
00568   // Beta patch
00569   for(int i=0; i<number_arguments; i++)
00570     {
00571       if (libraries_in[i])
00572         {
00573      bool res=libraries_in[i]->setComId(id);
00574      if (!res) {
00575 #ifdef PACO_WARN_DISTLIB
00576        std::cerr << "warning: argument #"<<i<<" with dist lib "<<libraries_in_name[i]<<" does not support comm cache\n";
00577 #else
00578        std::cerr << "warning: argument #"<<i<<" does not support comm cache\n";
00579 #endif
00580      }
00581         }
00582     }
00583 }
00584 
00585 void 
00586 PaCO_operation::clearCommunicationMemoryId(long id) throw (InvalidArgument)
00587 {
00588   if (id<0)
00589     throw new InvalidArgument("clearCommunicationMemeoryId expects an id >0");
00590   
00591 #ifdef DEBUG_INTERNAL
00592   std::cerr <<"[INFO] clearing communication memory Id "<<id<<endl;
00593 #endif
00594   
00595   _com_info_map_t::iterator it2 = _com_info_map.find(id);
00596   if (it2 != _com_info_map.end())
00597     {
00598       delete _com_info_map[id];
00599       _com_info_map.erase(it2);
00600     }
00601 
00602   assert(my_comScheduling!=0);
00603   my_comScheduling->clearScheduleId(id);
00604 }
00605 
00606 void
00607 PaCO_operation::clearScheduleMemoryId(long id) throw (InvalidArgument)
00608 {
00609   assert(my_comScheduling!=0);
00610   my_comScheduling->clearScheduleId(id);
00611 }
00612 
00613 void
00614 PaCO_operation::clearAllScheduleMemory() 
00615 {
00616   if (my_comScheduling)
00617     my_comScheduling->clearAllSchedules();
00618 }