PaCO++  0.05
Schedule.cc
Go to the documentation of this file.
00001 /* Padico Advanced Examples
00002  * author: Christian Pérez
00003  */
00004 
00005 #include <stdio.h>
00006 #include <mpi.h>
00007 
00008 #include <algorithm>
00009 
00010 #include "Schedule.h"
00011 #include "Internal.h"
00012 #include "DistributionBloc.h"
00013 
00014 #include <vector>
00015 #include <iostream>
00016 
00017 using namespace std;
00018 
00019 #undef DEBUG_INTERNAL
00020 #undef DEBUG_COMM
00021 
00022 /************************************************************/
00023 /************************************************************/
00024 /************************************************************/
00025 
00026 // Ascending rank sorting function for schedule
00027 bool cmp_rank(const LocalData_t& a, const LocalData_t& b)
00028 {
00029   return a.start < b.start;
00030 }
00031 
00032 
00033 /************************************************************/
00034 /************************************************************/
00035 /************************************************************/
00036 
00037 // vOut represents what localData/stopo have to send to nodes of dtopo (vOut[].rank is in dtopo space)
00038 void computeSendBlock1D(const GlobalData_t& gd, const LocalData_t &sd, 
00039          const Topology_t &stopo, const Topology_t &dtopo,
00040          const ParisBlock_param_t* param, vector<LocalData_t>& vOut) {
00041   
00042 #ifdef DEBUG_INTERNAL
00043   cerr << "\nIn compute Send Schedule--------------------\n";
00044 
00045   fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
00046   fprintf(stderr, "gd.len %ld\tgd.cyclic: %ld\tsd.start %d\tsd.len %d\n", gd.len, gd.cyclic, sd.start, sd.len);
00047 #endif
00048 
00049   if (stopo.total == dtopo.total) {
00050     //    vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
00051     vOut.push_back(sd);
00052 #ifdef DEBUG_INTERNAL
00053     fprintf(stderr, "  rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
00054 #endif
00055   } else {
00056     // Append mode
00057     vOut.reserve(vOut.size() + dtopo.total); // in bloc mode, at most one msg to each dest node
00058     
00059     unsigned slbsz  = blockSize(gd.len, stopo.total, param);
00060     
00061     if (gd.cyclic == 0) {
00062       // that's a standard bloc redistribution
00063 
00064       unsigned long slow, shigh;
00065       computeBlockBounds(&slow, &shigh, gd.len, sd.rank, stopo.total, slbsz, 0);
00066     
00067       unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
00068 
00069       unsigned fpid, lpid;
00070       fpid = getProcRangeInf(slow,  dlbsz);
00071       lpid = getProcRangeSup(shigh, dlbsz);
00072     
00073 #ifdef DEBUG_INTERNAL
00074       fprintf(stderr, "  loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
00075 #endif
00076     
00077       // for each dest bloc
00078       for(unsigned i=fpid; i <= lpid; i++) {           
00079 
00080    vOut.resize(vOut.size()+1);
00081    LocalData_t& s =  vOut[vOut.size()-1];
00082    
00083    s.rank    = i;
00084    unsigned tmp = i*dlbsz; 
00085    s.start = ( slow >= tmp)?slow:tmp; // max
00086    
00087    tmp = (i+1)*dlbsz;
00088    unsigned end = ( shigh <= tmp)?shigh:tmp; // min
00089       
00090    s.len   = end - s.start;
00091    
00092    s.base = sd.base + ((s.start - sd.start) * gd.unit_size);
00093 
00094 #ifdef DEBUG_INTERNAL
00095    fprintf(stderr, "   s1: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
00096 #endif
00097       }
00098 
00099     } else {
00100       // it is a blockcyclic distribution
00101    
00102       unsigned stbsz  = slbsz * stopo.total;
00103       unsigned nbbloc = NumberOfBlockProc(gd.len, stopo.total, slbsz, sd.rank);
00104 
00105       // for each src bloc, find a dst node
00106       for(unsigned b=0; b<nbbloc; b++) {
00107    unsigned gb = b * stopo.total + sd.rank; // global bloc id
00108    unsigned drank = OwnerBlock(gb, dtopo.total);
00109    
00110    vOut.resize(vOut.size()+1);
00111    LocalData_t& s =  vOut[vOut.size()-1];
00112    
00113    s.rank  = drank;
00114    s.start = (stbsz*b) + (sd.rank*slbsz);
00115    s.len   = BlockNumberOfElementProc(gd.len, sd.rank, stopo.total, slbsz, b);
00116    s.base  = sd.base + ( b * slbsz * gd.unit_size );
00117    
00118 #ifdef DEBUG_INTERNAL
00119    fprintf(stderr, "   s2: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
00120 #endif
00121       }
00122     }
00123   }
00124 
00125 #ifdef DEBUG_INTERNAL
00126   cerr << "\nIn compute Send Schedule-------------------- done\n";
00127 #endif
00128 }
00129 
00133 
00134 // vOut represents what localData/dtopo have to receive from nodes of stopo (vOut[].rank is in stopo space)
00135 void computeReceiveBlock1D(const GlobalData_t& gd, const LocalData_t &dd, 
00136             const Topology_t &stopo, const Topology_t &dtopo,
00137             const ParisBlock_param_t* param, vector<LocalData_t>& vOut) {
00138 
00139 #ifdef DEBUG_INTERNAL
00140   cerr << "\nIn compute Receive Schedule--------------------\n";
00141 
00142   fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
00143   fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);
00144 
00145 #endif
00146 
00147   if (stopo.total == dtopo.total) {
00148     vOut.push_back(dd);
00149 #ifdef DEBUG_INTERNAL
00150     fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
00151 #endif
00152   } else {
00153     // Apend mode
00154     vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node
00155 
00156     unsigned slbsz = blockSize(gd.len, stopo.total, param);
00157     
00158     if (gd.cyclic == 0) {
00159 
00160       unsigned long dlow  = dd.start;
00161       unsigned long dhigh = dlow + dd.len;
00162     
00163       unsigned fpid, lpid;
00164       fpid = getProcRangeInf(dlow,  slbsz);
00165       lpid = getProcRangeSup(dhigh, slbsz);
00166     
00167 #ifdef DEBUG_INTERNAL
00168       fprintf(stderr, "  loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
00169 #endif
00170       
00171       // for each src bloc
00172       for(unsigned i=fpid; i <= lpid; i++) {
00173    
00174    vOut.resize(vOut.size()+1);
00175    LocalData_t& s =  vOut[vOut.size()-1];
00176    
00177    s.rank       = i;
00178    unsigned tmp = i*slbsz; 
00179    s.start = ( dlow >= tmp)?dlow:tmp; // max
00180    
00181    tmp = (i+1)*slbsz;
00182    unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
00183    
00184    s.len  = end - s.start;
00185    
00186    s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
00187    
00188 #ifdef DEBUG_INTERNAL
00189    fprintf(stderr, "   r1: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
00190 #endif
00191       }
00192     } else {
00193       // it is a blockcyclic distribution
00194    
00195       unsigned dlbsz = blockSize(gd.len, dtopo.total, param);
00196       unsigned dtbsz = dlbsz * dtopo.total;
00197       unsigned nbbloc = NumberOfBlockProc(gd.len, dtopo.total, dlbsz, dd.rank);
00198 
00199       // for each dst bloc, find a src node
00200       for(unsigned b=0; b<nbbloc; b++) {
00201    unsigned gb = b * dtopo.total + dd.rank; // global bloc id
00202    unsigned srank = OwnerBlock(gb, stopo.total);
00203    
00204    vOut.resize(vOut.size()+1);
00205    LocalData_t& s =  vOut[vOut.size()-1];
00206    
00207    s.rank  = srank;
00208    s.start = dtbsz*b + dd.rank*dlbsz;
00209    s.len   = BlockNumberOfElementProc(gd.len, dd.rank, dtopo.total, dlbsz, b);
00210    s.base  = dd.base + b * dlbsz * gd.unit_size;
00211    
00212 #ifdef DEBUG_INTERNAL
00213    fprintf(stderr, "   r2: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
00214 #endif
00215       }
00216     }
00217   }
00218 }
00219 
00220 /************************************************************/
00221 /************************************************************/
00222 /************************************************************/
00223 
00224 // ctopo is the current topo
00225 // sdata: a pointer to the source data (pointer to local data)
00226 // ddata: a pointer to the base address of of reveive (pointer to local data)
00227 void doSchedule(const GlobalData_t& gd, const LocalData_t& ld, const Topology_t &ctopo,
00228       vector<LocalData_t>& sched_send, vector<LocalData_t>& sched_recv, void* comm) {
00229   
00230 #ifdef DEBUG_INTERNAL
00231   cerr << "\nIn doSchedule--------------------\n";
00232 #endif
00233   
00234   MPI_Comm mpi_comm = *(MPI_Comm*) comm;
00235 
00236 #ifdef DEBUG_COMM
00237   fprintf(stderr," MPI_COMM_WORLD=%d mpi_comm=%d\n", MPI_COMM_WORLD, mpi_comm);
00238 #endif 
00239 
00240   if (sched_send.size() || sched_recv.size()) {
00241 
00242     MPI_Request sreq[sched_send.size()];
00243     MPI_Request rreq[sched_recv.size()];
00244     unsigned si, ri;
00245     si=0;
00246     ri=0;
00247     
00248     MPI_Status sstat[sched_send.size()];
00249     MPI_Status rstat[sched_recv.size()];
00250 
00251     vector<LocalData_t*> local_recv;
00252     vector<LocalData_t*> local_send;
00253     
00254     local_recv.clear();
00255     local_send.clear();
00256 
00258     // Sorting data
00259 
00260     if (sched_send.size()) std::stable_sort(sched_send.begin(), sched_send.end(), cmp_rank);
00261     if (sched_recv.size()) std::stable_sort(sched_recv.begin(), sched_recv.end(), cmp_rank);
00262 
00264     // Sending data
00265         
00266     // Post Asynchronous MPI receive
00267 #ifdef DEBUG_COM
00268     cerr << "    #sched_recv: " << sched_recv.size() << endl;
00269 #endif
00270     for(unsigned i=0; i < sched_recv.size(); i++) {
00271       unsigned from = getProcId(sched_recv[i].rank, ctopo);
00272       if (from == ld.rank) {
00273 #ifdef DEBUG_COMM
00274    fprintf(stderr, "    recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
00275       sched_recv[i].start, sched_recv[i].len, from);
00276 #endif
00277    local_recv.push_back(&sched_recv[i]);
00278       } else {
00279 #ifdef DEBUG_COMM
00280    fprintf(stderr, "   recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
00281       sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
00282 #endif
00283       
00284    int err = MPI_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size, 
00285              MPI_BYTE, from, 51, mpi_comm, &rreq[ri++]);
00286    if (err!= MPI_SUCCESS) {
00287      cerr << "EROR IN MPI_Irecv: return value is "<<err<<endl;
00288    }
00289       }
00290     }
00291     
00292     // Send data via MPI
00293 #ifdef DEBUG_COMM
00294     cerr << "    #sched_send: " << sched_send.size() << endl;
00295 #endif
00296     for(unsigned i=0; i < sched_send.size(); i++) {
00297       unsigned to = getProcId(sched_send[i].rank, ctopo);
00298       if (to == ld.rank) {
00299 #ifdef DEBUG_COMM
00300    fprintf(stderr, "    send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
00301       sched_send[i].start, sched_send[i].len, to);
00302 #endif
00303    local_send.push_back(&sched_send[i]);
00304       } else {
00305 #ifdef DEBUG_COMM
00306    fprintf(stderr, "    send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
00307       sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
00308 #endif
00309       
00310    int err = MPI_Isend(sched_send[i].base, sched_send[i].len*gd.unit_size, 
00311              MPI_BYTE, to, 51, mpi_comm, &sreq[si++]);      
00312    if (err!= MPI_SUCCESS) {
00313      cerr << "EROR IN MPI_Isend: return value is "<<err<<endl;
00314    }
00315       }
00316     }
00317     
00318     // Do local communication vie memcpy
00319     if (local_recv.size() != local_send.size()) {
00320       cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
00321     }
00322     for(unsigned i=0; i < local_recv.size(); i++) {
00323       if (local_recv[i]->len != local_send[i]->len) {
00324    cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
00325       }
00326 #ifdef DEBUG_COMM
00327       fprintf(stderr, "    local: scheds no=%d start=%d len=%d\n", i,
00328          sched_send[i].start, sched_send[i].len);
00329 #endif
00330       memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
00331     }
00332 
00333 
00334     // Wait all receive & send
00335 #ifdef DEBUG_INTERNAL
00336     cerr << "WAITING local communications to end...\n";
00337 #endif
00338 
00339     int err;
00340     err = MPI_Waitall(si, sreq, sstat);
00341     if (err!= MPI_SUCCESS) {
00342       cerr << "EROR IN MPI_WaitAll for send: return value is "<<err<<endl;
00343       }
00344     err = MPI_Waitall(ri, rreq, rstat);
00345     if (err!= MPI_SUCCESS) {
00346       cerr << "EROR IN MPI_WaitAll for recv: return value is "<<err<<endl;
00347     }
00348 #ifdef DEBUG_INTERNAL
00349     cerr << "WAITING local communications to end...ok \n";
00350 #endif
00351   }
00352 }
00353 
00357