PaCO++
0.05
|
00001 /* Padico Advanced Examples 00002 * author: Christian Pérez 00003 */ 00004 00005 #include <stdio.h> 00006 #include <Padico/MPCircuit.h> 00007 00008 #include "Schedule.h" 00009 #include "Internal.h" 00010 #include "DistributionBloc.h" 00011 00012 //#define NO_COM 00013 00014 #undef STANDALONE_FILE 00015 00016 #define DEBUG_INTERNAL 00017 #define DEBUG_COMM 00018 00019 #ifdef NO_COM 00020 inline char* mymalloc(unsigned sz) { 00021 static unsigned base=0x1000000; 00022 unsigned ret = base; 00023 base+=0x1000; 00024 return (char*) ret; 00025 } 00026 #endif 00027 00028 #ifdef STANDALONE_FILE 00029 #include "../Generated/Concret.h" 00030 #include "../Generated/XServiceType.h" 00031 #endif 00032 00033 00034 /************************************************************/ 00035 /************************************************************/ 00036 /************************************************************/ 00037 00038 // vOut represents what localData/stopo have to send to nodes of dtopo (vOut[].rank is in dtopo space) 00039 void computeSendBlock1D(const GlobalData_t& gd, const LocalData_t &sd, 00040 const Topology_t &stopo, const Topology_t &dtopo, 00041 vector<LocalData_t>& vOut) { 00042 00043 #ifdef DEBUG_INTERNAL 00044 cerr << "\nIn compute Send Schedule--------------------\n"; 00045 00046 fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total); 00047 fprintf(stderr, "gd.len %ld\tsd.start %d\tsd.len %d\n",gd.len, sd.start, sd.len); 00048 #endif 00049 00050 if (stopo.total == dtopo.total) { 00051 // vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node 00052 vOut.push_back(sd); 00053 #ifdef DEBUG_INTERNAL 00054 fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base); 00055 #endif 00056 } else { 00057 // Append mode 00058 vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node 00059 00060 unsigned dbsz = blockSize(gd.len, dtopo.total); 00061 00062 unsigned long slow = sd.start; 00063 unsigned long shigh = slow + sd.len; 00064 00065 unsigned fpid, lpid; 00066 fpid = getProcRangeInf(slow, dbsz); 00067 lpid = getProcRangeSup(shigh, dbsz); 00068 00069 #ifdef DEBUG_INTERNAL 00070 fprintf(stderr, " loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total); 00071 #endif 00072 00073 // for each dest bloc 00074 for(unsigned i=fpid; i <= lpid; i++) { 00075 00076 vOut.resize(vOut.size()+1); 00077 LocalData_t& s = vOut[vOut.size()-1]; 00078 00079 s.rank = i; 00080 unsigned tmp = i*dbsz; 00081 s.start = ( slow >= tmp)?slow:tmp; // max 00082 00083 tmp = (i+1)*dbsz; 00084 unsigned end = ( shigh <= tmp)?shigh:tmp; // min 00085 00086 s.len = end - s.start; 00087 00088 s.base = sd.base + ((s.start - sd.start) * gd.unit_size); 00089 00090 #ifdef DEBUG_INTERNAL 00091 fprintf(stderr, " s: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base); 00092 #endif 00093 } 00094 } 00095 #ifdef DEBUG_INTERNAL 00096 cerr << "\nIn compute Send Schedule-------------------- done\n"; 00097 #endif 00098 } 00099 00103 00104 // vOut represents what localData/dtopo have to receive from nodes of stopo (vOut[].rank is in stopo space) 00105 void computeReceiveBlock1D(const GlobalData_t& gd, const LocalData_t &dd, 00106 const Topology_t &stopo, const Topology_t &dtopo, 00107 vector<LocalData_t>& vOut) { 00108 00109 #ifdef DEBUG_INTERNAL 00110 cerr << "\nIn compute Receive Schedule--------------------\n"; 00111 00112 fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total); 00113 fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len); 00114 00115 #endif 00116 00117 if (stopo.total == dtopo.total) { 00118 vOut.push_back(dd); 00119 #ifdef DEBUG_INTERNAL 00120 fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base); 00121 #endif 00122 } else { 00123 // Apend mode 00124 vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node 00125 00126 unsigned sbsz = blockSize(gd.len, stopo.total); 00127 00128 unsigned long dlow = dd.start; 00129 unsigned long dhigh = dlow + dd.len; 00130 00131 unsigned fpid, lpid; 00132 fpid = getProcRangeInf(dlow, sbsz); 00133 lpid = getProcRangeSup(dhigh, sbsz); 00134 00135 #ifdef DEBUG_INTERNAL 00136 fprintf(stderr, " loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total); 00137 #endif 00138 00139 // for each dest bloc 00140 for(unsigned i=fpid; i <= lpid; i++) { 00141 00142 vOut.resize(vOut.size()+1); 00143 LocalData_t& s = vOut[vOut.size()-1]; 00144 00145 s.rank = i; 00146 unsigned tmp = i*sbsz; 00147 s.start = ( dlow >= tmp)?dlow:tmp; // max 00148 00149 tmp = (i+1)*sbsz; 00150 unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min 00151 00152 s.len = end - s.start; 00153 00154 s.base = dd.base + ((s.start - dd.start) * gd.unit_size); 00155 00156 #ifdef DEBUG_INTERNAL 00157 fprintf(stderr, " r: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base); 00158 #endif 00159 } 00160 } 00161 } 00162 00163 /************************************************************/ 00164 /************************************************************/ 00165 /************************************************************/ 00166 00167 // ctopo is the current topo 00168 // sdata: a pointer to the source data (pointer to local data) 00169 // ddata: a pointer to the base address of of reveive (pointer to local data) 00170 void doSchedule(const GlobalData_t& gd, const LocalData_t& ld, const Topology_t &ctopo, 00171 vector<LocalData_t>& sched_send, vector<LocalData_t>& sched_recv, void* comm) { 00172 00173 cerr << "\nIn doSchedule--------------------\n"; 00174 00175 padico_mpcircuit_t schd_mpc = (padico_mpcircuit_t) comm; 00176 00177 if (sched_send.size() || sched_recv.size()) { 00178 00179 #ifndef NO_COM 00180 void* rreq[sched_recv.size()]; 00181 unsigned ri; 00182 ri=0; 00183 #endif 00184 00185 vector<LocalData_t*> local_recv; 00186 vector<LocalData_t*> local_send; 00187 00188 local_recv.clear(); 00189 local_send.clear(); 00190 00192 // Sending data 00193 00194 // Post Asynchronous MPCircuit receive 00195 #ifdef DEBUG_COM 00196 cerr << " #sched_recv: " << sched_recv.size() << endl; 00197 #endif 00198 for(unsigned i=0; i < sched_recv.size(); i++) { 00199 unsigned from = getProcId(sched_recv[i].rank, ctopo); 00200 if (from == ld.rank) { 00201 #ifdef DEBUG_COMM 00202 fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i, 00203 sched_recv[i].start, sched_recv[i].len, from); 00204 #endif 00205 local_recv.push_back(&sched_recv[i]); 00206 } else { 00207 #ifdef DEBUG_COMM 00208 fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i, 00209 sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base); 00210 #endif 00211 00212 #ifndef NO_COM 00213 rreq[ri++] = padico_mpcircuit_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size, 00214 from, 51, schd_mpc ); 00215 #endif 00216 } 00217 } 00218 00219 // Send data via MPCircuit 00220 #ifdef DEBUG_COMM 00221 cerr << " #sched_send: " << sched_send.size() << endl; 00222 #endif 00223 for(unsigned i=0; i < sched_send.size(); i++) { 00224 unsigned to = getProcId(sched_send[i].rank, ctopo); 00225 if (to == ld.rank) { 00226 #ifdef DEBUG_COMM 00227 fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i, 00228 sched_send[i].start, sched_send[i].len, to); 00229 #endif 00230 local_send.push_back(&sched_send[i]); 00231 } else { 00232 #ifdef DEBUG_COMM 00233 fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d base=%p\n", i, 00234 sched_send[i].start, sched_send[i].len, to, sched_send[i].base); 00235 #endif 00236 00237 #ifndef NO_COM 00238 padico_mpcircuit_send(sched_send[i].base, sched_send[i].len*gd.unit_size, 00239 to, 51, schd_mpc); 00240 #endif 00241 } 00242 } 00243 00244 // Do local communication vie memcpy 00245 if (local_recv.size() != local_send.size()) { 00246 cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl; 00247 } 00248 for(unsigned i=0; i < local_recv.size(); i++) { 00249 if (local_recv[i]->len != local_send[i]->len) { 00250 cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl; 00251 } 00252 #ifdef DEBUG_COMM 00253 fprintf(stderr, " local: scheds no=%d start=%d len=%d\n", i, 00254 sched_send[i].start, sched_send[i].len); 00255 #endif 00256 #ifndef NO_COM 00257 memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size); 00258 #endif 00259 } 00260 00261 00262 // Wait all receive & send 00263 #ifndef NO_COM 00264 #ifdef DEBUG_INTERNAL 00265 cerr << "WAITING local communications to end...\n"; 00266 #endif 00267 00268 padico_mpcircuit_waitAll(rreq, ri); 00269 #ifdef DEBUG_INTERNAL 00270 cerr << "WAITING local communications to end...ok \n"; 00271 #endif 00272 #endif 00273 } 00274 } 00275 00279 00280 #ifdef STANDALONE_FILE 00281 int simSendDataBlock1D(unsigned int glen, int total, int rank, int dtotal, const PaCO::distLoc_t& mode) { 00282 00283 GlobalData_t gd; 00284 LocalData_t sd; 00285 Topology_t stopo, dtopo; 00286 00287 vConcret vdarray; 00288 vector<unsigned> destid; 00289 00290 gd.len = glen; 00291 gd.unit_size = sizeof(xservice_data_t); 00292 00293 unsigned bsz = blockSize(glen, total); 00294 00295 sd.rank = rank; 00296 sd.start = computeBlockBoundInf0(bsz, rank); 00297 sd.len = localBlockLengthO(glen, rank, total, bsz); 00298 #ifdef NO_COM 00299 sd.base = (char*) 0x1000; 00300 #else 00301 sd.base = (char*) malloc(sd.len*gd.unit_size); 00302 xservice_data_t* p=(xservice_data_t*) sd.base; 00303 for( unsigned i=0; i < sd.len; i++) { 00304 *p++ = sd.start+i; 00305 } 00306 #endif 00307 00308 #if 0 00309 cerr << "Dumping data: "; 00310 for(unsigned k=0; k < sd.len; k++) 00311 cerr << " " << ((xservice_data_t*)sd.base)[k]; 00312 cerr << endl; 00313 #endif 00314 00315 stopo.total = total; 00316 dtopo.total = dtotal; 00317 00318 computeSendDataBlock1D(gd, sd, stopo, dtopo, vdarray, destid, mode); 00319 00322 // send data to remote nodes 00323 00324 padico_mpcircuit_barrier(schd_mpc); 00325 00326 cerr << "\n #vdarray: " << vdarray.size() << "\n"; 00327 for(unsigned i=0; i< vdarray.size(); i++) { 00328 cout << "Dumping vdarray["<<i<<"] to " << destid[i] << " :\n"; 00329 cout << " topo / gd : " << vdarray[i]->topo().total << " / " << vdarray[i]->gd().len << " / " << vdarray[i]->gd().unit_size << endl; 00330 cout << " dist # " << vdarray[i]->dist().length() << endl; 00331 for(unsigned j=0; j< vdarray[i]->dist().length(); j++) { 00332 cout << " rank/low/len (len): " << vdarray[i]->dist()[j].rank << " / " << vdarray[i]->dist()[j].start << " / " << vdarray[i]->dist()[j].len << " ( " << vdarray[i]->getDataLength(j) << " )" <<endl; 00333 } 00334 00335 #ifndef NO_COM 00336 // cerr << " data: "; fprintf(stderr, "%p:", &vdarray[i].Data(j, 0); 00337 // for(unsigned k=0; k < vdarray[i].getDataLength(j); k++) 00338 // cerr << " " << vdarray[i].Data(j, k); 00339 #endif 00340 cerr << endl; 00341 00342 // pglobal->xserviceList[destid[i]]->sendData(vdarray[i]); 00343 00344 } 00345 return 0; 00346 } 00347 00348 int main(int argc, char** argv) { 00349 00350 MPI_Init(&argc, &argv); 00351 00352 if (argc != 4) { 00353 fprintf(stderr,"Usage: %s len source_total dest_total_nodes\n", argv[0]); 00354 exit(1); 00355 } 00356 00357 unsigned glen, stotal, dtotal; 00358 00359 sscanf(argv[1],"%d",&glen); 00360 sscanf(argv[2],"%d",&stotal); 00361 sscanf(argv[3],"%d",&dtotal); 00362 00363 fprintf(stderr,"Distribution block 1D with len=%d stotal=%d dtotal=%d\n", glen, stotal, dtotal); 00364 00365 int rank; 00366 MPI_Comm_rank(MPI_COMM_WORLD, &rank); 00367 pid=rank; 00368 00369 // for(unsigned rank=0; rank< stotal; rank++) { 00370 cerr << endl << "------------------------------- " << rank << " / " << stotal << endl; 00371 simSendDataBlock1D(glen, stotal, rank, dtotal, PaCO::ClientSide); 00372 // } 00373 00374 cerr << pid << ": ENDING !!\n"; 00375 00376 MPI_Finalize(); 00377 } 00378 #endif