PaCO++  0.05
BasicDistributionLibrary.cc
Go to the documentation of this file.
00001 #include "BasicDistributionLibrary.h"
00002 
00003 #include "Schedule.h"
00004 
00005 BasicDistributionLibrary::BasicDistributionLibrary(Fabrique* f) { 
00006   setFabric(f);
00007   _darray = f->creer();
00008   _vdarray = f->vcreer(); 
00009   _vdarray->clear(); 
00010   
00011   _clttopo.total = 123; // debug
00012   _srvtopo.total = 123; // debug
00013 }
00014 
00015 BasicDistributionLibrary::BasicDistributionLibrary(Fabrique* f, void* comm ) : _comm(comm) { 
00016   setFabric(f);
00017   _darray = f->creer();
00018   _vdarray = f->vcreer(); 
00019   _vdarray->clear(); 
00020   
00021   _clttopo.total = 123; // debug
00022   _srvtopo.total = 123; // debug
00023 }
00024 
00025 BasicDistributionLibrary::~BasicDistributionLibrary() { 
00026   delete _fab; delete _darray; delete _vdarray;
00027 } 
00028 
00029 void BasicDistributionLibrary::setCommunicator(void* comm) { _comm = comm; }
00030 
00031 void BasicDistributionLibrary::setFabric(Fabrique* f) { _fab = f; }
00032 
00033 void BasicDistributionLibrary::reset() { _vdarray->clear(); }
00034 
00035 const PaCO::distLoc_t& BasicDistributionLibrary::getMode() const { return _mode; }
00036 
00037 bool BasicDistributionLibrary::setMode(const PaCO::distLoc_t mode) { _mode = mode; return true; } // true if ok
00038 
00039 bool BasicDistributionLibrary::setClientConfiguration(const PaCO::PacoTopology_t & ctopo) { _clttopo = ctopo; return true; }
00040 bool BasicDistributionLibrary::setServerConfiguration(const PaCO::PacoTopology_t & stopo) { _srvtopo = stopo; return true; }
00041 
00042 bool BasicDistributionLibrary::setGlobalDataConfiguration(const PaCO::PacoGlobalData_t& gd) { _gd = gd; return true; }
00043 bool BasicDistributionLibrary::setLocalDataConfiguration (const PaCO::PacoLocalData_t&  ld) {
00044   _ld.rank  = ld.rank;
00045   _ld.start = ld.start;
00046   _ld.len  = ld.len;
00047   return true;
00048 }
00049 
00050 void BasicDistributionLibrary::computeSend(const void* data, vAbstrait& vdarray, vector<unsigned>& destid) {
00051   _ld.base = (char*) data;
00052 
00053   computeSendDataBlock1D(_gd, _ld, _clttopo, _srvtopo, vdarray, destid, _mode, _comm);
00054 }
00055 
00056 bool BasicDistributionLibrary::computeReceive(Abstrait* darray) {
00057 
00058   this->setClientConfiguration(darray->topo());  
00059 
00060   // Get the mode of the client
00061   this->setMode(darray->mode());
00062 
00063   unsigned nb  = nbofPart(_mode, _clttopo.total, _srvtopo.total, _ld.rank);
00064 
00065   unsigned pos;
00066 
00067   // In server side redistribution, empty dist for server nodes whose id > max client node
00068   if (darray->dist().length() != 0 ) {
00069     pos = posofPart(_mode, _srvtopo.total, darray->dist()[0].rank);
00070   } else {
00071     pos = 0;
00072   }
00073 
00074   // Initialize _vdarray & remaining (assumed clean)
00075   if (_vdarray->size() == 0) {
00076     // _vdarray is empty: so let's define its size
00077     _vdarray->size(nb);
00078     _remaining = nb;
00079   }
00080 
00081 
00082   fprintf(stderr, "computeReceive: Node %d : set entry %d/%d to %p\n", _ld.rank, pos, nb, darray);
00083   _vdarray->setAbstrait(pos, darray);
00084   _remaining--;
00085 
00086   fprintf(stderr, "computeReceive: Node %d : got %d of %d data\n", _ld.rank, _vdarray->size() - _remaining, _vdarray->size());
00087 
00088   if (_remaining == 0) {
00089     computeReceiveDataBlock1D(_vdarray, _mode, _srvtopo.total, _ld.rank, _darray, _comm);
00090     return true;
00091   } else {
00092     return false;
00093   }
00094 }
00095 
00096 Abstrait* BasicDistributionLibrary::getResult() const { return _darray; }