PaCO++  0.05
Schedule.h File Reference
#include <vector>
#include <Abstrait.h>
#include "Type.h"
#include <iostream>
Include dependency graph for Schedule.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Functions

void computeReceiveBlock1D (const GlobalData_t &gd, const LocalData_t &dd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut)
bool computeReceiveDataBlock1D (vAbstrait *sdata, const PaCO::distLoc_t &mode, const unsigned total, const unsigned rank, const ParisBlock_param_t *param, Abstrait *varray, void *comm)
void computeSendBlock1D (const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vector< LocalData_t > &vOut)
void computeSendDataBlock1D (const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, const ParisBlock_param_t *param, vAbstrait &vdarray, vector< unsigned > &destid, const PaCO::distLoc_t &mode, void *comm)
void computeSendRemoteDataBlock1D (const GlobalData_t &gd, const LocalData_t &sd, const Topology_t &stopo, const Topology_t &dtopo, vector< LocalData_t > &vOut, const PaCO::distLoc_t mode)
void doSchedule (const GlobalData_t &gd, const LocalData_t &ld, const Topology_t &ctopo, vector< LocalData_t > &sched_send, vector< LocalData_t > &sched_recv, void *comm)
unsigned nbofPart (const PaCO::distLoc_t &mode, const unsigned stotal, const unsigned dtotal, const unsigned drank)
unsigned posofPart (const PaCO::distLoc_t &mode, const unsigned dtotal, const unsigned srank)

Function Documentation

void computeReceiveBlock1D ( const GlobalData_t &  gd,
const LocalData_t &  dd,
const Topology_t &  stopo,
const Topology_t &  dtopo,
const ParisBlock_param_t param,
vector< LocalData_t > &  vOut 
)

Definition at line 135 of file Schedule.cc.

References BlockNumberOfElementProc(), blockSize(), getProcRangeInf(), getProcRangeSup(), NumberOfBlockProc(), and OwnerBlock().

Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().

                                                                        {

#ifdef DEBUG_INTERNAL
  cerr << "\nIn compute Receive Schedule--------------------\n";

  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
  fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);

#endif

  if (stopo.total == dtopo.total) {
    vOut.push_back(dd);
#ifdef DEBUG_INTERNAL
    fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
#endif
  } else {
    // Apend mode
    vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node

    unsigned slbsz = blockSize(gd.len, stopo.total, param);
    
    if (gd.cyclic == 0) {

      unsigned long dlow  = dd.start;
      unsigned long dhigh = dlow + dd.len;
    
      unsigned fpid, lpid;
      fpid = getProcRangeInf(dlow,  slbsz);
      lpid = getProcRangeSup(dhigh, slbsz);
    
#ifdef DEBUG_INTERNAL
      fprintf(stderr, "  loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
#endif
      
      // for each src bloc
      for(unsigned i=fpid; i <= lpid; i++) {
   
   vOut.resize(vOut.size()+1);
   LocalData_t& s =  vOut[vOut.size()-1];
   
   s.rank       = i;
   unsigned tmp = i*slbsz; 
   s.start = ( dlow >= tmp)?dlow:tmp; // max
   
   tmp = (i+1)*slbsz;
   unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
   
   s.len  = end - s.start;
   
   s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
   
#ifdef DEBUG_INTERNAL
   fprintf(stderr, "   r1: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
      }
    } else {
      // it is a blockcyclic distribution
   
      unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
      unsigned dtbsz = dlbsz * dtopo.total;
      unsigned nbbloc = NumberOfBlockProc(gd.len, dtopo.total, dlbsz, dd.rank);

      // for each dst bloc, find a src node
      for(unsigned b=0; b<nbbloc; b++) {
   unsigned gb = b * dtopo.total + dd.rank; // global bloc id
   unsigned srank = OwnerBlock(gb, stopo.total);
   
   vOut.resize(vOut.size()+1);
   LocalData_t& s =  vOut[vOut.size()-1];
   
   s.rank  = srank;
   s.start = dtbsz*b + dd.rank*dlbsz;
   s.len   = BlockNumberOfElementProc(gd.len, dd.rank, dtopo.total, dlbsz, b);
   s.base  = dd.base + b * dlbsz * gd.unit_size;
   
#ifdef DEBUG_INTERNAL
   fprintf(stderr, "   r2: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
      }
    }
  }
}

Here is the call graph for this function:

bool computeReceiveDataBlock1D ( vAbstrait *  sdata,
const PaCO::distLoc_t mode,
const unsigned  total,
const unsigned  rank,
const ParisBlock_param_t param,
Abstrait *  varray,
void *  comm 
)

Definition at line 151 of file ServerSide.cc.

References PaCO::ClientSide, PaCO::CommSide, computeReceiveDataBlock1DServer(), PaCO::none, and PaCO::ServerSide.

Referenced by ParisBlockMPC::computeReceive(), BasicDistributionLibrary::computeReceive(), and ParisBlock::computeReceive().

                                                {

#ifdef DEBUG_INTERNAL
  cerr << "In computeReceiveDataBlock1D...\n";
#endif  
  switch(mode) {
  case PaCO::none:
    cerr << "INTERNAL ERROR: " << __FILE__ << " " << __FUNCTION__ << endl;
    break;
  case PaCO::ClientSide:
#ifdef DEBUG_INTERNAL
    cerr << "Client Side case" <<endl;
#endif
    varray->CopyAndGetSequenceOwner((*vdarray)[0]);
#ifdef DEBUG_INTERNAL
    cerr << "Return true" << endl;
#endif
    return true;
    break; 
    
  case PaCO::ServerSide: {
    // data has been sent from source i to dest i % dtopo.total
    // so dest j has : j, j + dtopo.total, j+2*dtopo.total
    // so it may be 1 (at least activation) or more
    cerr << " vdaray size: " << vdarray->size() << endl;

//      unsigned stotal = (*vdarray)[0]->topo().total;
//      unsigned int nvdarray  = stotal/dtotal;
//      unsigned int remaining = (nvdarray*dtotal + drank < stotal)?1:0;
//      nvdarray += remaining;
//      if ( nvdarray == 0 ) nvdarray=1; // alway at least one

//      fprintf(stderr, "computeReceiveDataBlock1D: Node %d : got %d of %d data\n", drank, vdarray->size(), nvdarray);
    
//      if (vdarray->size() == nvdarray) {

#ifdef DEBUG_INTERNAL
    cerr << "DOING  LOCAL REDISTRIBUTION width drank: "<< drank << endl;
#endif
    Topology_t dtopo;
    dtopo.total   = dtotal;

    varray->topo() = (*vdarray)[0]->topo();
    varray->gd()   = (*vdarray)[0]->gd();
    varray->dist().length(1);
    varray->dist()[0].rank  = drank;

    computeReceiveDataBlock1DServer(vdarray, dtopo, param, varray, comm);

    return true;
    break;
  }
  case PaCO::CommSide:
    break;
  }
  return false;
}

Here is the call graph for this function:

void computeSendBlock1D ( const GlobalData_t &  gd,
const LocalData_t &  sd,
const Topology_t &  stopo,
const Topology_t &  dtopo,
const ParisBlock_param_t param,
vector< LocalData_t > &  vOut 
)

Definition at line 38 of file Schedule.cc.

References BlockNumberOfElementProc(), blockSize(), computeBlockBounds(), getProcRangeInf(), getProcRangeSup(), NumberOfBlockProc(), and OwnerBlock().

Referenced by computeReceiveDataBlock1DServer(), and computeSendDataBlock1DClient().

                                                                     {
  
#ifdef DEBUG_INTERNAL
  cerr << "\nIn compute Send Schedule--------------------\n";

  fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
  fprintf(stderr, "gd.len %ld\tgd.cyclic: %ld\tsd.start %d\tsd.len %d\n", gd.len, gd.cyclic, sd.start, sd.len);
#endif

  if (stopo.total == dtopo.total) {
    //    vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
    vOut.push_back(sd);
#ifdef DEBUG_INTERNAL
    fprintf(stderr, "  rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
#endif
  } else {
    // Append mode
    vOut.reserve(vOut.size() + dtopo.total); // in bloc mode, at most one msg to each dest node
    
    unsigned slbsz  = blockSize(gd.len, stopo.total, param);
    
    if (gd.cyclic == 0) {
      // that's a standard bloc redistribution

      unsigned long slow, shigh;
      computeBlockBounds(&slow, &shigh, gd.len, sd.rank, stopo.total, slbsz, 0);
    
      unsigned dlbsz = blockSize(gd.len, dtopo.total, param);

      unsigned fpid, lpid;
      fpid = getProcRangeInf(slow,  dlbsz);
      lpid = getProcRangeSup(shigh, dlbsz);
    
#ifdef DEBUG_INTERNAL
      fprintf(stderr, "  loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
#endif
    
      // for each dest bloc
      for(unsigned i=fpid; i <= lpid; i++) {           

   vOut.resize(vOut.size()+1);
   LocalData_t& s =  vOut[vOut.size()-1];
   
   s.rank    = i;
   unsigned tmp = i*dlbsz; 
   s.start = ( slow >= tmp)?slow:tmp; // max
   
   tmp = (i+1)*dlbsz;
   unsigned end = ( shigh <= tmp)?shigh:tmp; // min
      
   s.len   = end - s.start;
   
   s.base = sd.base + ((s.start - sd.start) * gd.unit_size);

#ifdef DEBUG_INTERNAL
   fprintf(stderr, "   s1: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
      }

    } else {
      // it is a blockcyclic distribution
   
      unsigned stbsz  = slbsz * stopo.total;
      unsigned nbbloc = NumberOfBlockProc(gd.len, stopo.total, slbsz, sd.rank);

      // for each src bloc, find a dst node
      for(unsigned b=0; b<nbbloc; b++) {
   unsigned gb = b * stopo.total + sd.rank; // global bloc id
   unsigned drank = OwnerBlock(gb, dtopo.total);
   
   vOut.resize(vOut.size()+1);
   LocalData_t& s =  vOut[vOut.size()-1];
   
   s.rank  = drank;
   s.start = (stbsz*b) + (sd.rank*slbsz);
   s.len   = BlockNumberOfElementProc(gd.len, sd.rank, stopo.total, slbsz, b);
   s.base  = sd.base + ( b * slbsz * gd.unit_size );
   
#ifdef DEBUG_INTERNAL
   fprintf(stderr, "   s2: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
#endif
      }
    }
  }

#ifdef DEBUG_INTERNAL
  cerr << "\nIn compute Send Schedule-------------------- done\n";
#endif
}

Here is the call graph for this function:

void computeSendDataBlock1D ( const GlobalData_t &  gd,
const LocalData_t &  sd,
const Topology_t &  stopo,
const Topology_t &  dtopo,
const ParisBlock_param_t param,
vAbstrait &  vdarray,
vector< unsigned > &  destid,
const PaCO::distLoc_t mode,
void *  comm 
)

Definition at line 215 of file ClientSide.cc.

References PaCO::ClientSide, PaCO::CommSide, computeSendDataBlock1DClient(), computeSendDataBlock1DComm(), computeSendDataBlock1DServer(), PaCO::none, and PaCO::ServerSide.

Referenced by ParisBlockMPC::computeSend(), BasicDistributionLibrary::computeSend(), and ParisBlock::computeSend().

                                                    {

  switch(mode) {
  case PaCO::none: cerr << "INTERNAL ERROR: " << __FILE__ << " " << __FUNCTION__ << endl;
  case PaCO::ClientSide: computeSendDataBlock1DClient(gd, sd, param, stopo, dtopo, vdarray, destid, comm); break;
  case PaCO::ServerSide: computeSendDataBlock1DServer(gd, sd, stopo, dtopo, vdarray, destid); break;
  case PaCO::CommSide:   computeSendDataBlock1DComm  (); break;   
  }
  
}

Here is the call graph for this function:

void computeSendRemoteDataBlock1D ( const GlobalData_t &  gd,
const LocalData_t &  sd,
const Topology_t &  stopo,
const Topology_t &  dtopo,
vector< LocalData_t > &  vOut,
const PaCO::distLoc_t  mode 
)
void doSchedule ( const GlobalData_t &  gd,
const LocalData_t &  ld,
const Topology_t &  ctopo,
vector< LocalData_t > &  sched_send,
vector< LocalData_t > &  sched_recv,
void *  comm 
)

Definition at line 227 of file Schedule.cc.

                                                                                    {
  
#ifdef DEBUG_INTERNAL
  cerr << "\nIn doSchedule--------------------\n";
#endif
  
  MPI_Comm mpi_comm = *(MPI_Comm*) comm;

#ifdef DEBUG_COMM
  fprintf(stderr," MPI_COMM_WORLD=%d mpi_comm=%d\n", MPI_COMM_WORLD, mpi_comm);
#endif 

  if (sched_send.size() || sched_recv.size()) {

    MPI_Request sreq[sched_send.size()];
    MPI_Request rreq[sched_recv.size()];
    unsigned si, ri;
    si=0;
    ri=0;
    
    MPI_Status sstat[sched_send.size()];
    MPI_Status rstat[sched_recv.size()];

    vector<LocalData_t*> local_recv;
    vector<LocalData_t*> local_send;
    
    local_recv.clear();
    local_send.clear();

    // Sorting data

    if (sched_send.size()) std::stable_sort(sched_send.begin(), sched_send.end(), cmp_rank);
    if (sched_recv.size()) std::stable_sort(sched_recv.begin(), sched_recv.end(), cmp_rank);

    // Sending data
        
    // Post Asynchronous MPI receive
#ifdef DEBUG_COM
    cerr << "    #sched_recv: " << sched_recv.size() << endl;
#endif
    for(unsigned i=0; i < sched_recv.size(); i++) {
      unsigned from = getProcId(sched_recv[i].rank, ctopo);
      if (from == ld.rank) {
#ifdef DEBUG_COMM
   fprintf(stderr, "    recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
      sched_recv[i].start, sched_recv[i].len, from);
#endif
   local_recv.push_back(&sched_recv[i]);
      } else {
#ifdef DEBUG_COMM
   fprintf(stderr, "   recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
      sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
#endif
      
   int err = MPI_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size, 
             MPI_BYTE, from, 51, mpi_comm, &rreq[ri++]);
   if (err!= MPI_SUCCESS) {
     cerr << "EROR IN MPI_Irecv: return value is "<<err<<endl;
   }
      }
    }
    
    // Send data via MPI
#ifdef DEBUG_COMM
    cerr << "    #sched_send: " << sched_send.size() << endl;
#endif
    for(unsigned i=0; i < sched_send.size(); i++) {
      unsigned to = getProcId(sched_send[i].rank, ctopo);
      if (to == ld.rank) {
#ifdef DEBUG_COMM
   fprintf(stderr, "    send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
      sched_send[i].start, sched_send[i].len, to);
#endif
   local_send.push_back(&sched_send[i]);
      } else {
#ifdef DEBUG_COMM
   fprintf(stderr, "    send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
      sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
#endif
      
   int err = MPI_Isend(sched_send[i].base, sched_send[i].len*gd.unit_size, 
             MPI_BYTE, to, 51, mpi_comm, &sreq[si++]);      
   if (err!= MPI_SUCCESS) {
     cerr << "EROR IN MPI_Isend: return value is "<<err<<endl;
   }
      }
    }
    
    // Do local communication vie memcpy
    if (local_recv.size() != local_send.size()) {
      cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
    }
    for(unsigned i=0; i < local_recv.size(); i++) {
      if (local_recv[i]->len != local_send[i]->len) {
   cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
      }
#ifdef DEBUG_COMM
      fprintf(stderr, "    local: scheds no=%d start=%d len=%d\n", i,
         sched_send[i].start, sched_send[i].len);
#endif
      memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
    }


    // Wait all receive & send
#ifdef DEBUG_INTERNAL
    cerr << "WAITING local communications to end...\n";
#endif

    int err;
    err = MPI_Waitall(si, sreq, sstat);
    if (err!= MPI_SUCCESS) {
      cerr << "EROR IN MPI_WaitAll for send: return value is "<<err<<endl;
      }
    err = MPI_Waitall(ri, rreq, rstat);
    if (err!= MPI_SUCCESS) {
      cerr << "EROR IN MPI_WaitAll for recv: return value is "<<err<<endl;
    }
#ifdef DEBUG_INTERNAL
    cerr << "WAITING local communications to end...ok \n";
#endif
  }
}
unsigned nbofPart ( const PaCO::distLoc_t mode,
const unsigned  stotal,
const unsigned  dtotal,
const unsigned  drank 
)

Definition at line 120 of file ServerSide.cc.

Referenced by ParisBlockMPC::computeReceive(), BasicDistributionLibrary::computeReceive(), and ParisBlock::computeReceive().

                                                                                                                  {
  switch(mode) {
  case PaCO::none: cerr << "INTERNAL ERROR: " << __FILE__ << " " << __FUNCTION__ << endl; return 0;
  case PaCO::ClientSide: return 1; 
  case PaCO::ServerSide:
    {
      unsigned int nvdarray  = stotal/dtotal;
      unsigned int remaining = (nvdarray*dtotal + drank < stotal)?1:0;
      nvdarray += remaining;
      if ( nvdarray == 0 ) return 1; // alway at least one
      return nvdarray;
    }
  case PaCO::CommSide: cerr << "INTERNAL ERROR: not yet implemented in " << __FILE__ << " " << __FUNCTION__ << endl; return 0;
  }
  return 0;
}
unsigned posofPart ( const PaCO::distLoc_t mode,
const unsigned  dtotal,
const unsigned  srank 
)

Definition at line 138 of file ServerSide.cc.

Referenced by ParisBlockMPC::computeReceive(), BasicDistributionLibrary::computeReceive(), and ParisBlock::computeReceive().

                                                                                            {
  switch(mode) {
  case PaCO::none: cerr << "INTERNAL ERROR: " << __FILE__ << " " << __FUNCTION__ << endl; return 0;
  case PaCO::ClientSide: return 0; 
  case PaCO::ServerSide: return srank / dtotal;
  case PaCO::CommSide: cerr << "INTERNAL ERROR: not yet implemented in " << __FILE__ << " " << __FUNCTION__ << endl; return 0;
  }
  return 0;
}