PaCO++  0.05
Schedule.cc
Go to the documentation of this file.
00001 /* Padico Advanced Examples
00002  * author: Christian Pérez
00003  */
00004 
00005 #include <stdio.h>
00006 #include <Padico/MPCircuit.h>
00007 
00008 #include "Schedule.h"
00009 #include "Internal.h"
00010 #include "DistributionBloc.h"
00011 
00012 //#define NO_COM
00013 
00014 #undef STANDALONE_FILE
00015 
00016 #define DEBUG_INTERNAL
00017 #define DEBUG_COMM
00018 
00019 #ifdef NO_COM
00020 inline char* mymalloc(unsigned sz) {
00021   static unsigned base=0x1000000;
00022   unsigned ret = base;
00023   base+=0x1000;
00024   return (char*) ret;
00025 }
00026 #endif
00027 
00028 #ifdef STANDALONE_FILE
00029 #include "../Generated/Concret.h"
00030 #include "../Generated/XServiceType.h"
00031 #endif
00032 
00033 
00034 /************************************************************/
00035 /************************************************************/
00036 /************************************************************/
00037 
00038 // vOut represents what localData/stopo have to send to nodes of dtopo (vOut[].rank is in dtopo space)
00039 void computeSendBlock1D(const GlobalData_t& gd, const LocalData_t &sd, 
00040          const Topology_t &stopo, const Topology_t &dtopo,
00041          vector<LocalData_t>& vOut) {
00042   
00043 #ifdef DEBUG_INTERNAL
00044   cerr << "\nIn compute Send Schedule--------------------\n";
00045 
00046   fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
00047   fprintf(stderr, "gd.len %ld\tsd.start %d\tsd.len %d\n",gd.len, sd.start, sd.len);
00048 #endif
00049 
00050   if (stopo.total == dtopo.total) {
00051     //    vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
00052     vOut.push_back(sd);
00053 #ifdef DEBUG_INTERNAL
00054     fprintf(stderr, "  rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base);
00055 #endif
00056   } else {
00057     // Append mode
00058     vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node
00059 
00060     unsigned dbsz = blockSize(gd.len, dtopo.total);
00061     
00062     unsigned long slow  = sd.start;
00063     unsigned long shigh = slow + sd.len;
00064     
00065     unsigned fpid, lpid;
00066     fpid = getProcRangeInf(slow,  dbsz);
00067     lpid = getProcRangeSup(shigh, dbsz);
00068     
00069 #ifdef DEBUG_INTERNAL
00070     fprintf(stderr, "  loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total);
00071 #endif
00072     
00073     // for each dest bloc
00074     for(unsigned i=fpid; i <= lpid; i++) {           
00075 
00076       vOut.resize(vOut.size()+1);
00077       LocalData_t& s =  vOut[vOut.size()-1];
00078    
00079       s.rank    = i;
00080       unsigned tmp = i*dbsz;  
00081       s.start = ( slow >= tmp)?slow:tmp; // max
00082       
00083       tmp = (i+1)*dbsz;
00084       unsigned end = ( shigh <= tmp)?shigh:tmp; // min
00085       
00086       s.len   = end - s.start;
00087 
00088       s.base = sd.base + ((s.start - sd.start) * gd.unit_size);
00089 
00090 #ifdef DEBUG_INTERNAL
00091       fprintf(stderr, "    s: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
00092 #endif
00093     }
00094   }
00095 #ifdef DEBUG_INTERNAL
00096   cerr << "\nIn compute Send Schedule-------------------- done\n";
00097 #endif
00098 }
00099 
00103 
00104 // vOut represents what localData/dtopo have to receive from nodes of stopo (vOut[].rank is in stopo space)
00105 void computeReceiveBlock1D(const GlobalData_t& gd, const LocalData_t &dd, 
00106             const Topology_t &stopo, const Topology_t &dtopo,
00107             vector<LocalData_t>& vOut) {
00108 
00109 #ifdef DEBUG_INTERNAL
00110   cerr << "\nIn compute Receive Schedule--------------------\n";
00111 
00112   fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total);
00113   fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len);
00114 
00115 #endif
00116 
00117   if (stopo.total == dtopo.total) {
00118     vOut.push_back(dd);
00119 #ifdef DEBUG_INTERNAL
00120     fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base);
00121 #endif
00122   } else {
00123     // Apend mode
00124     vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node
00125 
00126     unsigned sbsz = blockSize(gd.len, stopo.total);
00127     
00128     unsigned long dlow  = dd.start;
00129     unsigned long dhigh = dlow + dd.len;
00130     
00131     unsigned fpid, lpid;
00132     fpid = getProcRangeInf(dlow,  sbsz);
00133     lpid = getProcRangeSup(dhigh, sbsz);
00134     
00135 #ifdef DEBUG_INTERNAL
00136     fprintf(stderr, "  loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total);
00137 #endif
00138 
00139     // for each dest bloc
00140     for(unsigned i=fpid; i <= lpid; i++) {
00141       
00142       vOut.resize(vOut.size()+1);
00143       LocalData_t& s =  vOut[vOut.size()-1];
00144    
00145       s.rank       = i;
00146       unsigned tmp = i*sbsz;  
00147       s.start = ( dlow >= tmp)?dlow:tmp; // max
00148    
00149       tmp = (i+1)*sbsz;
00150       unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min
00151       
00152       s.len  = end - s.start;
00153 
00154       s.base = dd.base + ((s.start - dd.start) * gd.unit_size);
00155 
00156 #ifdef DEBUG_INTERNAL
00157       fprintf(stderr, "    r: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base);
00158 #endif
00159     }
00160   }
00161 }
00162 
00163 /************************************************************/
00164 /************************************************************/
00165 /************************************************************/
00166 
00167 // ctopo is the current topo
00168 // sdata: a pointer to the source data (pointer to local data)
00169 // ddata: a pointer to the base address of of reveive (pointer to local data)
00170 void doSchedule(const GlobalData_t& gd, const LocalData_t& ld, const Topology_t &ctopo,
00171       vector<LocalData_t>& sched_send, vector<LocalData_t>& sched_recv, void* comm) {
00172   
00173   cerr << "\nIn doSchedule--------------------\n";
00174 
00175   padico_mpcircuit_t schd_mpc = (padico_mpcircuit_t) comm;
00176 
00177   if (sched_send.size() || sched_recv.size()) {
00178 
00179 #ifndef NO_COM
00180     void* rreq[sched_recv.size()];
00181     unsigned ri;
00182     ri=0;
00183 #endif
00184 
00185     vector<LocalData_t*> local_recv;
00186     vector<LocalData_t*> local_send;
00187     
00188     local_recv.clear();
00189     local_send.clear();
00190 
00192     // Sending data
00193         
00194     // Post Asynchronous MPCircuit receive
00195 #ifdef DEBUG_COM
00196     cerr << "    #sched_recv: " << sched_recv.size() << endl;
00197 #endif
00198     for(unsigned i=0; i < sched_recv.size(); i++) {
00199       unsigned from = getProcId(sched_recv[i].rank, ctopo);
00200       if (from == ld.rank) {
00201 #ifdef DEBUG_COMM
00202    fprintf(stderr, "    recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i,
00203       sched_recv[i].start, sched_recv[i].len, from);
00204 #endif
00205    local_recv.push_back(&sched_recv[i]);
00206       } else {
00207 #ifdef DEBUG_COMM
00208    fprintf(stderr, "   recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i,
00209       sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base);
00210 #endif
00211       
00212 #ifndef NO_COM
00213    rreq[ri++] = padico_mpcircuit_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size, 
00214                    from, 51, schd_mpc );
00215 #endif
00216       }
00217     }
00218     
00219     // Send data via MPCircuit
00220 #ifdef DEBUG_COMM
00221     cerr << "    #sched_send: " << sched_send.size() << endl;
00222 #endif
00223     for(unsigned i=0; i < sched_send.size(); i++) {
00224       unsigned to = getProcId(sched_send[i].rank, ctopo);
00225       if (to == ld.rank) {
00226 #ifdef DEBUG_COMM
00227    fprintf(stderr, "    send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i,
00228       sched_send[i].start, sched_send[i].len, to);
00229 #endif
00230    local_send.push_back(&sched_send[i]);
00231       } else {
00232 #ifdef DEBUG_COMM
00233    fprintf(stderr, "    send: scheds no=%d start=%d len=%d to=%d base=%p\n", i,
00234       sched_send[i].start, sched_send[i].len, to, sched_send[i].base);
00235 #endif
00236       
00237 #ifndef NO_COM
00238    padico_mpcircuit_send(sched_send[i].base, sched_send[i].len*gd.unit_size, 
00239                   to, 51, schd_mpc);
00240 #endif
00241       }
00242     }
00243     
00244     // Do local communication vie memcpy
00245     if (local_recv.size() != local_send.size()) {
00246       cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl;
00247     }
00248     for(unsigned i=0; i < local_recv.size(); i++) {
00249       if (local_recv[i]->len != local_send[i]->len) {
00250    cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl;
00251       }
00252 #ifdef DEBUG_COMM
00253       fprintf(stderr, "    local: scheds no=%d start=%d len=%d\n", i,
00254          sched_send[i].start, sched_send[i].len);
00255 #endif
00256 #ifndef NO_COM
00257       memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size);
00258 #endif
00259     }
00260 
00261 
00262     // Wait all receive & send
00263 #ifndef NO_COM
00264 #ifdef DEBUG_INTERNAL
00265     cerr << "WAITING local communications to end...\n";
00266 #endif
00267 
00268     padico_mpcircuit_waitAll(rreq, ri);
00269 #ifdef DEBUG_INTERNAL
00270     cerr << "WAITING local communications to end...ok \n";
00271 #endif
00272 #endif
00273   }
00274 }
00275 
00279 
00280 #ifdef STANDALONE_FILE
00281 int simSendDataBlock1D(unsigned int glen, int total, int rank, int dtotal, const PaCO::distLoc_t& mode) {
00282 
00283   GlobalData_t gd;
00284   LocalData_t  sd;
00285   Topology_t   stopo, dtopo;
00286   
00287   vConcret vdarray;
00288   vector<unsigned> destid;
00289   
00290   gd.len       = glen;
00291   gd.unit_size = sizeof(xservice_data_t);
00292   
00293   unsigned bsz = blockSize(glen, total);
00294     
00295   sd.rank  = rank;
00296   sd.start = computeBlockBoundInf0(bsz, rank);
00297   sd.len   = localBlockLengthO(glen, rank, total, bsz);
00298 #ifdef NO_COM
00299   sd.base  = (char*) 0x1000;
00300 #else
00301   sd.base  = (char*) malloc(sd.len*gd.unit_size);
00302   xservice_data_t* p=(xservice_data_t*) sd.base;
00303   for( unsigned i=0; i < sd.len; i++) {
00304     *p++ = sd.start+i;
00305   }
00306 #endif
00307   
00308 #if 0
00309   cerr << "Dumping data: ";
00310   for(unsigned k=0; k < sd.len; k++)
00311     cerr << " " << ((xservice_data_t*)sd.base)[k];
00312   cerr << endl;
00313 #endif
00314 
00315   stopo.total = total;
00316   dtopo.total = dtotal;
00317   
00318   computeSendDataBlock1D(gd, sd, stopo, dtopo, vdarray, destid, mode);
00319   
00322   // send data to remote nodes
00323 
00324   padico_mpcircuit_barrier(schd_mpc);
00325 
00326   cerr << "\n #vdarray: " << vdarray.size() << "\n";
00327   for(unsigned i=0; i< vdarray.size(); i++) {
00328     cout << "Dumping vdarray["<<i<<"] to " << destid[i] << " :\n";
00329     cout << "   topo / gd : " <<  vdarray[i]->topo().total << " / " << vdarray[i]->gd().len << " / " << vdarray[i]->gd().unit_size << endl;
00330     cout << " dist # " <<  vdarray[i]->dist().length() << endl;
00331     for(unsigned j=0; j< vdarray[i]->dist().length(); j++) {
00332       cout << " rank/low/len (len): " <<  vdarray[i]->dist()[j].rank << " / " << vdarray[i]->dist()[j].start << " / " << vdarray[i]->dist()[j].len << " ( " << vdarray[i]->getDataLength(j) << " )" <<endl;
00333     }
00334       
00335 #ifndef NO_COM
00336     //        cerr << " data: "; fprintf(stderr, "%p:", &vdarray[i].Data(j, 0);
00337     //        for(unsigned k=0; k < vdarray[i].getDataLength(j); k++)
00338     //   cerr << " " << vdarray[i].Data(j, k);
00339 #endif
00340     cerr << endl;
00341     
00342     //      pglobal->xserviceList[destid[i]]->sendData(vdarray[i]);
00343     
00344   }   
00345   return 0;
00346 }
00347 
00348 int main(int argc, char** argv) {
00349 
00350   MPI_Init(&argc, &argv);
00351 
00352   if (argc != 4) {
00353     fprintf(stderr,"Usage: %s len source_total dest_total_nodes\n", argv[0]);
00354     exit(1);
00355   }
00356   
00357   unsigned glen, stotal, dtotal;
00358   
00359   sscanf(argv[1],"%d",&glen);
00360   sscanf(argv[2],"%d",&stotal);
00361   sscanf(argv[3],"%d",&dtotal);
00362 
00363   fprintf(stderr,"Distribution block 1D with len=%d stotal=%d dtotal=%d\n", glen, stotal, dtotal);
00364 
00365  int rank;
00366  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
00367  pid=rank;
00368 
00369   //  for(unsigned rank=0; rank< stotal; rank++) {
00370  cerr << endl << "------------------------------- " << rank << " / " << stotal << endl;
00371  simSendDataBlock1D(glen, stotal, rank, dtotal, PaCO::ClientSide);
00372     //  }
00373 
00374  cerr << pid << ": ENDING !!\n";
00375 
00376   MPI_Finalize();
00377 }
00378 #endif