PaCO++  0.05
ParisBlock.cc
Go to the documentation of this file.
00001 #include "ParisBlock.h"
00002 
00003 #include "Schedule.h"
00004 
00005 ParisBlock::ParisBlock(Fabrique* f ) { 
00006   setFabric(f);
00007   _darray = f->creer();
00008   _vdarray = f->vcreer(); 
00009   _vdarray->clear(); 
00010   
00011   _clttopo.total = 123; // debug
00012   _srvtopo.total = 123; // debug
00013 
00014   // param
00015   _param.type = PARISBLOCK_BLOCK; // default is bloc
00016   _param.blocksize = 1; //  default is cyclic(1)
00017   _param.unitblocksize = 1; // by default
00018   
00019 }
00020 
00021 ParisBlock::~ParisBlock() { 
00022   delete _fab; delete _darray; delete _vdarray;
00023 } 
00024 
00025 void ParisBlock::setFabric(Fabrique* f) { _fab = f; }
00026 
00027 void ParisBlock::reset() { _vdarray->clear(); }
00028 
00029 const PaCO::distLoc_t& ParisBlock::getMode() const { return _mode; }
00030 
00031 bool ParisBlock::setMode(const PaCO::distLoc_t mode) { _mode = mode; return true; } // true if ok
00032 
00033 bool ParisBlock::setClientConfiguration(const PaCO::PacoTopology_t & ctopo) { _clttopo = ctopo; return true; }
00034 bool ParisBlock::setServerConfiguration(const PaCO::PacoTopology_t & stopo) { _srvtopo = stopo; return true; }
00035 
00036 bool ParisBlock::setGlobalDataConfiguration(const PaCO::PacoGlobalData_t& gd) { _gd = gd; return true; }
00037 bool ParisBlock::setLocalDataConfiguration (const PaCO::PacoLocalData_t&  ld) {
00038   _ld.rank  = ld.rank;
00039   _ld.start = ld.start;
00040   _ld.len  = ld.len;
00041   return true;
00042 }
00043 
00044 int ParisBlock::setConfig(void * config) {
00045   _param = * ( ParisBlock_param_t*) config;  
00046   return 0;
00047 }
00048 
00049 void* ParisBlock::getConfig() {
00050   return (void*) &_param;
00051 }
00052 
00053 void ParisBlock::computeSend(const void* data, vAbstrait& vdarray, vector<unsigned>& destid) {
00054   _ld.base = (char*) data;
00055 
00056   computeSendDataBlock1D(_gd, _ld, _clttopo, _srvtopo, &_param, vdarray, destid, _mode, _comm);
00057 }
00058 
00059 bool ParisBlock::computeReceive(Abstrait* darray) {
00060 
00061   this->setClientConfiguration(darray->topo());  
00062 
00063   // Get the mode of the client
00064   this->setMode(darray->mode());
00065 
00066   unsigned nb  = nbofPart(_mode, _clttopo.total, _srvtopo.total, _ld.rank);
00067 
00068   unsigned pos;
00069 
00070   // In server side redistribution, empty dist for server nodes whose id > max client node
00071   if (darray->dist().length() != 0 ) {
00072     pos = posofPart(_mode, _srvtopo.total, darray->dist()[0].rank);
00073   } else {
00074     pos = 0;
00075   }
00076 
00077   // Initialize _vdarray & remaining (assumed clean)
00078   if (_vdarray->size() == 0) {
00079     // _vdarray is empty: so let's define its size
00080     _vdarray->size(nb);
00081     _remaining = nb;
00082   }
00083 
00084 
00085   //  fprintf(stderr, "computeReceive: Node %d : set entry %d/%d to %p\n", _ld.rank, pos, nb, darray);
00086   _vdarray->setAbstrait(pos, darray);
00087   _remaining--;
00088 
00089   //  fprintf(stderr, "computeReceive: Node %d : got %d of %d data\n", _ld.rank, _vdarray->size() - _remaining, _vdarray->size());
00090 
00091   if (_remaining == 0) {
00092     computeReceiveDataBlock1D(_vdarray, _mode, _srvtopo.total, _ld.rank, &_param, _darray, _comm);
00093     return true;
00094   } else {
00095     return false;
00096   }
00097 }
00098 
00099 Abstrait* ParisBlock::getResult() const { return _darray; }