PaCO++
0.05
|
00001 /* Padico Advanced Examples 00002 * author: Christian Pérez 00003 */ 00004 00005 #include <stdio.h> 00006 #include <mpi.h> 00007 00008 #include <algorithm> 00009 00010 #include "Schedule.h" 00011 #include "Internal.h" 00012 #include "DistributionBloc.h" 00013 00014 #include <vector> 00015 #include <iostream> 00016 00017 using namespace std; 00018 00019 #undef DEBUG_INTERNAL 00020 #undef DEBUG_COMM 00021 00022 /************************************************************/ 00023 /************************************************************/ 00024 /************************************************************/ 00025 00026 // Ascending rank sorting function for schedule 00027 bool cmp_rank(const LocalData_t& a, const LocalData_t& b) 00028 { 00029 return a.start < b.start; 00030 } 00031 00032 00033 /************************************************************/ 00034 /************************************************************/ 00035 /************************************************************/ 00036 00037 // vOut represents what localData/stopo have to send to nodes of dtopo (vOut[].rank is in dtopo space) 00038 void computeSendBlock1D(const GlobalData_t& gd, const LocalData_t &sd, 00039 const Topology_t &stopo, const Topology_t &dtopo, 00040 const ParisBlock_param_t* param, vector<LocalData_t>& vOut) { 00041 00042 #ifdef DEBUG_INTERNAL 00043 cerr << "\nIn compute Send Schedule--------------------\n"; 00044 00045 fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total); 00046 fprintf(stderr, "gd.len %ld\tgd.cyclic: %ld\tsd.start %d\tsd.len %d\n", gd.len, gd.cyclic, sd.start, sd.len); 00047 #endif 00048 00049 if (stopo.total == dtopo.total) { 00050 // vOut.reserve(vOut.size()+dtopo.total); // in bloc mode, at most one msg to each dest node 00051 vOut.push_back(sd); 00052 #ifdef DEBUG_INTERNAL 00053 fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", sd.rank, sd.start, sd.len, sd.base); 00054 #endif 00055 } else { 00056 // Append mode 00057 vOut.reserve(vOut.size() + dtopo.total); // in bloc mode, at most one msg to each dest node 00058 00059 unsigned slbsz = blockSize(gd.len, stopo.total, param); 00060 00061 if (gd.cyclic == 0) { 00062 // that's a standard bloc redistribution 00063 00064 unsigned long slow, shigh; 00065 computeBlockBounds(&slow, &shigh, gd.len, sd.rank, stopo.total, slbsz, 0); 00066 00067 unsigned dlbsz = blockSize(gd.len, dtopo.total, param); 00068 00069 unsigned fpid, lpid; 00070 fpid = getProcRangeInf(slow, dlbsz); 00071 lpid = getProcRangeSup(shigh, dlbsz); 00072 00073 #ifdef DEBUG_INTERNAL 00074 fprintf(stderr, " loop from %d to %d width dtotal: %ld\n", fpid, lpid, dtopo.total); 00075 #endif 00076 00077 // for each dest bloc 00078 for(unsigned i=fpid; i <= lpid; i++) { 00079 00080 vOut.resize(vOut.size()+1); 00081 LocalData_t& s = vOut[vOut.size()-1]; 00082 00083 s.rank = i; 00084 unsigned tmp = i*dlbsz; 00085 s.start = ( slow >= tmp)?slow:tmp; // max 00086 00087 tmp = (i+1)*dlbsz; 00088 unsigned end = ( shigh <= tmp)?shigh:tmp; // min 00089 00090 s.len = end - s.start; 00091 00092 s.base = sd.base + ((s.start - sd.start) * gd.unit_size); 00093 00094 #ifdef DEBUG_INTERNAL 00095 fprintf(stderr, " s1: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base); 00096 #endif 00097 } 00098 00099 } else { 00100 // it is a blockcyclic distribution 00101 00102 unsigned stbsz = slbsz * stopo.total; 00103 unsigned nbbloc = NumberOfBlockProc(gd.len, stopo.total, slbsz, sd.rank); 00104 00105 // for each src bloc, find a dst node 00106 for(unsigned b=0; b<nbbloc; b++) { 00107 unsigned gb = b * stopo.total + sd.rank; // global bloc id 00108 unsigned drank = OwnerBlock(gb, dtopo.total); 00109 00110 vOut.resize(vOut.size()+1); 00111 LocalData_t& s = vOut[vOut.size()-1]; 00112 00113 s.rank = drank; 00114 s.start = (stbsz*b) + (sd.rank*slbsz); 00115 s.len = BlockNumberOfElementProc(gd.len, sd.rank, stopo.total, slbsz, b); 00116 s.base = sd.base + ( b * slbsz * gd.unit_size ); 00117 00118 #ifdef DEBUG_INTERNAL 00119 fprintf(stderr, " s2: to:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base); 00120 #endif 00121 } 00122 } 00123 } 00124 00125 #ifdef DEBUG_INTERNAL 00126 cerr << "\nIn compute Send Schedule-------------------- done\n"; 00127 #endif 00128 } 00129 00133 00134 // vOut represents what localData/dtopo have to receive from nodes of stopo (vOut[].rank is in stopo space) 00135 void computeReceiveBlock1D(const GlobalData_t& gd, const LocalData_t &dd, 00136 const Topology_t &stopo, const Topology_t &dtopo, 00137 const ParisBlock_param_t* param, vector<LocalData_t>& vOut) { 00138 00139 #ifdef DEBUG_INTERNAL 00140 cerr << "\nIn compute Receive Schedule--------------------\n"; 00141 00142 fprintf(stderr, "stopo: %ld\tdtopo: %ld\n",stopo.total, dtopo.total); 00143 fprintf(stderr, "gd.len %ld\tdd.start %d\tdd.len %d\n", gd.len, dd.start, dd.len); 00144 00145 #endif 00146 00147 if (stopo.total == dtopo.total) { 00148 vOut.push_back(dd); 00149 #ifdef DEBUG_INTERNAL 00150 fprintf(stderr, " rank:%d start:%d len:%d base:%p\n", dd.rank, dd.start, dd.len, dd.base); 00151 #endif 00152 } else { 00153 // Apend mode 00154 vOut.reserve(vOut.size()+stopo.total); // in bloc mode, at most one msg from each src node 00155 00156 unsigned slbsz = blockSize(gd.len, stopo.total, param); 00157 00158 if (gd.cyclic == 0) { 00159 00160 unsigned long dlow = dd.start; 00161 unsigned long dhigh = dlow + dd.len; 00162 00163 unsigned fpid, lpid; 00164 fpid = getProcRangeInf(dlow, slbsz); 00165 lpid = getProcRangeSup(dhigh, slbsz); 00166 00167 #ifdef DEBUG_INTERNAL 00168 fprintf(stderr, " loop from %d to %d width stotal: %ld\n", fpid, lpid, stopo.total); 00169 #endif 00170 00171 // for each src bloc 00172 for(unsigned i=fpid; i <= lpid; i++) { 00173 00174 vOut.resize(vOut.size()+1); 00175 LocalData_t& s = vOut[vOut.size()-1]; 00176 00177 s.rank = i; 00178 unsigned tmp = i*slbsz; 00179 s.start = ( dlow >= tmp)?dlow:tmp; // max 00180 00181 tmp = (i+1)*slbsz; 00182 unsigned end = ( dhigh <= tmp)?dhigh:tmp; // min 00183 00184 s.len = end - s.start; 00185 00186 s.base = dd.base + ((s.start - dd.start) * gd.unit_size); 00187 00188 #ifdef DEBUG_INTERNAL 00189 fprintf(stderr, " r1: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base); 00190 #endif 00191 } 00192 } else { 00193 // it is a blockcyclic distribution 00194 00195 unsigned dlbsz = blockSize(gd.len, dtopo.total, param); 00196 unsigned dtbsz = dlbsz * dtopo.total; 00197 unsigned nbbloc = NumberOfBlockProc(gd.len, dtopo.total, dlbsz, dd.rank); 00198 00199 // for each dst bloc, find a src node 00200 for(unsigned b=0; b<nbbloc; b++) { 00201 unsigned gb = b * dtopo.total + dd.rank; // global bloc id 00202 unsigned srank = OwnerBlock(gb, stopo.total); 00203 00204 vOut.resize(vOut.size()+1); 00205 LocalData_t& s = vOut[vOut.size()-1]; 00206 00207 s.rank = srank; 00208 s.start = dtbsz*b + dd.rank*dlbsz; 00209 s.len = BlockNumberOfElementProc(gd.len, dd.rank, dtopo.total, dlbsz, b); 00210 s.base = dd.base + b * dlbsz * gd.unit_size; 00211 00212 #ifdef DEBUG_INTERNAL 00213 fprintf(stderr, " r2: from:%d start:%d len:%d base:%p\n", s.rank, s.start, s.len, s.base); 00214 #endif 00215 } 00216 } 00217 } 00218 } 00219 00220 /************************************************************/ 00221 /************************************************************/ 00222 /************************************************************/ 00223 00224 // ctopo is the current topo 00225 // sdata: a pointer to the source data (pointer to local data) 00226 // ddata: a pointer to the base address of of reveive (pointer to local data) 00227 void doSchedule(const GlobalData_t& gd, const LocalData_t& ld, const Topology_t &ctopo, 00228 vector<LocalData_t>& sched_send, vector<LocalData_t>& sched_recv, void* comm) { 00229 00230 #ifdef DEBUG_INTERNAL 00231 cerr << "\nIn doSchedule--------------------\n"; 00232 #endif 00233 00234 MPI_Comm mpi_comm = *(MPI_Comm*) comm; 00235 00236 #ifdef DEBUG_COMM 00237 fprintf(stderr," MPI_COMM_WORLD=%d mpi_comm=%d\n", MPI_COMM_WORLD, mpi_comm); 00238 #endif 00239 00240 if (sched_send.size() || sched_recv.size()) { 00241 00242 MPI_Request sreq[sched_send.size()]; 00243 MPI_Request rreq[sched_recv.size()]; 00244 unsigned si, ri; 00245 si=0; 00246 ri=0; 00247 00248 MPI_Status sstat[sched_send.size()]; 00249 MPI_Status rstat[sched_recv.size()]; 00250 00251 vector<LocalData_t*> local_recv; 00252 vector<LocalData_t*> local_send; 00253 00254 local_recv.clear(); 00255 local_send.clear(); 00256 00258 // Sorting data 00259 00260 if (sched_send.size()) std::stable_sort(sched_send.begin(), sched_send.end(), cmp_rank); 00261 if (sched_recv.size()) std::stable_sort(sched_recv.begin(), sched_recv.end(), cmp_rank); 00262 00264 // Sending data 00265 00266 // Post Asynchronous MPI receive 00267 #ifdef DEBUG_COM 00268 cerr << " #sched_recv: " << sched_recv.size() << endl; 00269 #endif 00270 for(unsigned i=0; i < sched_recv.size(); i++) { 00271 unsigned from = getProcId(sched_recv[i].rank, ctopo); 00272 if (from == ld.rank) { 00273 #ifdef DEBUG_COMM 00274 fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d LOCAL\n", i, 00275 sched_recv[i].start, sched_recv[i].len, from); 00276 #endif 00277 local_recv.push_back(&sched_recv[i]); 00278 } else { 00279 #ifdef DEBUG_COMM 00280 fprintf(stderr, " recv: schedr no=%d start=%d len=%d from=%d base=%p\n", i, 00281 sched_recv[i].start, sched_recv[i].len, from, sched_recv[i].base); 00282 #endif 00283 00284 int err = MPI_Irecv(sched_recv[i].base, sched_recv[i].len*gd.unit_size, 00285 MPI_BYTE, from, 51, mpi_comm, &rreq[ri++]); 00286 if (err!= MPI_SUCCESS) { 00287 cerr << "EROR IN MPI_Irecv: return value is "<<err<<endl; 00288 } 00289 } 00290 } 00291 00292 // Send data via MPI 00293 #ifdef DEBUG_COMM 00294 cerr << " #sched_send: " << sched_send.size() << endl; 00295 #endif 00296 for(unsigned i=0; i < sched_send.size(); i++) { 00297 unsigned to = getProcId(sched_send[i].rank, ctopo); 00298 if (to == ld.rank) { 00299 #ifdef DEBUG_COMM 00300 fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d LOCAL\n", i, 00301 sched_send[i].start, sched_send[i].len, to); 00302 #endif 00303 local_send.push_back(&sched_send[i]); 00304 } else { 00305 #ifdef DEBUG_COMM 00306 fprintf(stderr, " send: scheds no=%d start=%d len=%d to=%d base=%p\n", i, 00307 sched_send[i].start, sched_send[i].len, to, sched_send[i].base); 00308 #endif 00309 00310 int err = MPI_Isend(sched_send[i].base, sched_send[i].len*gd.unit_size, 00311 MPI_BYTE, to, 51, mpi_comm, &sreq[si++]); 00312 if (err!= MPI_SUCCESS) { 00313 cerr << "EROR IN MPI_Isend: return value is "<<err<<endl; 00314 } 00315 } 00316 } 00317 00318 // Do local communication vie memcpy 00319 if (local_recv.size() != local_send.size()) { 00320 cerr << "Error: local recv & send have different size: " << local_recv.size() << " " << local_send.size() << endl; 00321 } 00322 for(unsigned i=0; i < local_recv.size(); i++) { 00323 if (local_recv[i]->len != local_send[i]->len) { 00324 cerr << "Error: local recv & send have different len for i= "<<i<< " :" << local_recv[i]->len << " " << local_send[i]->len << endl; 00325 } 00326 #ifdef DEBUG_COMM 00327 fprintf(stderr, " local: scheds no=%d start=%d len=%d\n", i, 00328 sched_send[i].start, sched_send[i].len); 00329 #endif 00330 memcpy(local_recv[i]->base, local_send[i]->base, local_send[i]->len*gd.unit_size); 00331 } 00332 00333 00334 // Wait all receive & send 00335 #ifdef DEBUG_INTERNAL 00336 cerr << "WAITING local communications to end...\n"; 00337 #endif 00338 00339 int err; 00340 err = MPI_Waitall(si, sreq, sstat); 00341 if (err!= MPI_SUCCESS) { 00342 cerr << "EROR IN MPI_WaitAll for send: return value is "<<err<<endl; 00343 } 00344 err = MPI_Waitall(ri, rreq, rstat); 00345 if (err!= MPI_SUCCESS) { 00346 cerr << "EROR IN MPI_WaitAll for recv: return value is "<<err<<endl; 00347 } 00348 #ifdef DEBUG_INTERNAL 00349 cerr << "WAITING local communications to end...ok \n"; 00350 #endif 00351 } 00352 } 00353 00357