SALSA Analysis Modules
icmk.c
Go to the documentation of this file.
00001 /*! \file icmk.c \ingroup categories
00002   \brief Functions for computing the ICMK structure of a matrix
00003   
00004   \section icmk Inverse Cuthill-McKee distribution
00005 
00006   ICMK (`Inverse Cuthill-McKee') is a heuristic (Eijkhout[2001]) that,
00007   based on the 
00008   sparsity structure of a matrix, tries to find a meaningful block structure.
00009   This is done without permuting the matrix. This block structure can be used
00010   in Block Jacobi preconditioners: distributing the matrix over parallel
00011   processors according to the block structure often improves convergence.
00012 
00013   \section Usage
00014 
00015   Activate this module with:
00016 
00017   PetscErrorCode RegisterICMKModules();
00018 
00019   Compute these elements with
00020 
00021   ComputeQuantity("icmk","element",A,&res,&flg);
00022 
00023   Available elements are:
00024   - "nsplits" : number of split points found in the matrix
00025   - "splits" : the sequence of split points, including 0 and N
00026 
00027   \section References
00028 \verbatim
00029 @techreport{Eijk:auto-block,
00030 author = {Victor Eijkhout},
00031 title = {Automatic Determination of Matrix Blocks},
00032 institution = {Department of Computer Science, University of Tennessee},
00033 number = {ut-cs-01-458},
00034 note = {Lapack Working Note 151},
00035 year = {2001}
00036 }
00037 \endverbatim
00038 */
00039 #include <stdlib.h>
00040 #include "anamod.h"
00041 #include "anamodsalsamodules.h"
00042 #include "icmk.h"
00043 #include "petscmat.h"
00044 #include "petscis.h"
00045 #include "petsc.h"
00046 #include "anamodutils.h"
00047 
00048 /* #define TRACE_MSGS 1 */
00049 
00050 static void iSpaces(int len) {int i;for (i=0;i<len;i++){printf(" ");}}
00051 
00052 #undef __FUNCT__
00053 #define __FUNCT__ "MatIsNull"
00054 /* MatIsNull: test whether a matrix is wholly zero.
00055  * This can be optimised with knowledge of the implementation.
00056  */
00057 int MatIsNull(Mat A,int *flag)
00058 {
00059   int size,row,ncol,ierr;
00060   PetscFunctionBegin;
00061   ierr = MatGetSize(A,&size,&ncol); CHKERRQ(ierr);
00062   *flag = 1;
00063   for (row=0; row<size; row++) {
00064     ierr = MatGetRow(A,row,&ncol,PETSC_NULL,PETSC_NULL); CHKERRQ(ierr);
00065     if (ncol>0) *flag = 0;
00066     ierr = MatRestoreRow(A,row,&ncol,PETSC_NULL,PETSC_NULL); CHKERRQ(ierr);
00067     if (*flag==0) break;
00068   }
00069   PetscFunctionReturn(0);
00070 }
00071 
00072 #undef __FUNCT__
00073 #define __FUNCT__ "MatSubMatIsNull"
00074 /* MatSubMatIsNull: test whether a matrix is partly zero.
00075  * (bounds are inclusive)
00076  * This can be optimised with knowledge of the implementation.
00077  */
00078 int MatSubMatIsNull(Mat A,int i1,int i2,int j1,int j2,PetscTruth *isnull)
00079 {
00080   int first,last,row,ncol,j,ierr; const int *cols;
00081   PetscFunctionBegin;
00082   ierr = MatGetOwnershipRange(A,&first,&last); CHKERRQ(ierr);
00083   if (i1<first || i2>=last) SETERRQ(1,"Requesting (part) non-local submat");
00084   *isnull = PETSC_TRUE;
00085   for (row=i1; row<=i2; row++) {
00086     ierr = MatGetRow(A,row,&ncol,&cols,PETSC_NULL); CHKERRQ(ierr);
00087     if (ncol>=0)
00088       if (cols[0]<=j2 && cols[ncol-1]>=j1)
00089         for (j=0; j<ncol; j++)
00090           if (cols[j]>=j1 && cols[j]<=j2) *isnull = PETSC_FALSE;
00091     ierr = MatRestoreRow(A,row,&ncol,&cols,PETSC_NULL); CHKERRQ(ierr);
00092     if (!*isnull) break;
00093   }
00094   PetscFunctionReturn(0);
00095 }
00096 
00097 #undef __FUNCT__
00098 #define __FUNCT__ "PrintArray"
00099 static PetscErrorCode PrintArray(int *ar,int len,char *txt)
00100 {
00101   int i;
00102   PetscFunctionBegin;
00103   printf("%s: ",txt); for (i=0; i<len; i++) printf("%d,",ar[i]); printf("\n");
00104   PetscFunctionReturn(0);
00105 }
00106 
00107 #undef __FUNCT__
00108 #define __FUNCT__ "CondenseChain"
00109 /* Condense a chain to make it have the right length:
00110  * find smallest link and merge that with the smaller
00111  * of its neighbours.
00112  * NP len = #blocks = |chain|-1
00113  */
00114 static PetscErrorCode CondenseChain(int *chain,int len,int tar,IS *israr)
00115 {
00116   int i,ierr; int *rar; IS isret;
00117 #define CHDIF(a) rar[a+1]-rar[a]
00118   PetscFunctionBegin;
00119   if (len<tar) SETERRQ(1,"chain length less than target");
00120   /* nasty: we are not allowed to do anything to 'chain', so we need to
00121    * overallocate the return array; just by a few elements, though */
00122   ierr = PetscMalloc((len+1)*sizeof(int),&rar); CHKERRQ(ierr);
00123   for (i=0; i<=len; i++) {rar[i] = chain[i];}
00124   while (len>tar) {
00125     int lloc=0,lmin=CHDIF(0),l,shift_start;
00126     for (i=1; i<len; i++) {
00127       l = CHDIF(i); if (l<lmin) {lloc=i; lmin=l;}}
00128     if (lloc==0 || (lloc<len-1 && CHDIF(lloc+1)<CHDIF(lloc-1))) {
00129       shift_start = lloc+1; } else { shift_start = lloc;}
00130     for (i=shift_start; i<len; i++) rar[i] = rar[i+1];
00131     len--;
00132   }
00133 #ifdef TRACE_MSGS
00134   ierr = PrintArray(rar,len+1,"Condensed chain"); CHKERRQ(ierr);
00135 #endif
00136   ierr = ISCreateGeneral(MPI_COMM_SELF,len+1,rar,&isret); CHKERRQ(ierr);
00137   *israr = isret;
00138   PetscFunctionReturn(0);
00139 }
00140 
00141 /*
00142  * b: point from which we need to link
00143  */
00144 double mq; int nc,ml;
00145 #define SPLIT_BLOCK 3
00146 #undef __FUNCT__
00147 #define __FUNCT__ "CanChain"
00148 /* CanChain: recursively find all chains of split points of
00149  * at least the wanted length,
00150  * and yield up the highest quality one, suitably contracted.
00151  */
00152 static PetscErrorCode CanChain
00153 (int lev,int b,int N,int *splits,int nsplits,int *chain,int len,int q,IS *rar)
00154 {
00155   int nnext,next,*conts,ierr;
00156   double dq=(1.*q)/(len-1); 
00157   PetscFunctionBegin;
00158   if (len>=nsplits) SETERRQ(1,"chain overflow");
00159   chain[len] = b;
00160 #if TRACE_MSGS
00161   printf("[%d] ",lev); PrintArray(chain,len+1,"chain so far");
00162 #endif
00163   if (b==N) {
00164     ierr = ISCreateGeneral(MPI_COMM_SELF,len+1,chain,rar); CHKERRQ(ierr);
00165     PetscFunctionReturn(0);}
00166   {
00167     int split_start,split_end,isplit,next_block,contd,level;
00168     for (split_start=0; splits[split_start]<b; split_start+=SPLIT_BLOCK) ;
00169     /*
00170     if (splits[split_start]>b) {
00171 #if TRACE_MSGS
00172       printf(".. there is no block starting at %d\n",b);
00173 #endif
00174       PetscFunctionReturn(0);}
00175     */
00176     for (isplit=split_start;
00177          splits[isplit]==b && isplit<nsplits; isplit+=SPLIT_BLOCK) ;
00178     split_end = isplit-SPLIT_BLOCK;
00179     if (isplit>=nsplits)
00180       next_block = N;
00181     else
00182       next_block = splits[isplit/* +1 ??? */];
00183     nnext = isplit-split_start+SPLIT_BLOCK;
00184     ierr = PetscMalloc(nnext*sizeof(int),&conts); CHKERRQ(ierr);
00185     contd = 0;
00186     for (level=3; level>=1; level--)
00187       for (isplit=split_end; isplit>=split_start; isplit-=SPLIT_BLOCK)
00188         if (splits[isplit+2]==level) {
00189           ierr = PetscMemcpy
00190             (conts+contd,splits+isplit,SPLIT_BLOCK*sizeof(int)); CHKERRQ(ierr);
00191           contd += SPLIT_BLOCK;}
00192     if (next_block==N) {
00193       nnext -= SPLIT_BLOCK;
00194     } else {
00195       conts[contd] = b; conts[contd+1] = next_block; conts[contd+2] = 1;}
00196 #if TRACE_MSGS
00197     PrintArray(conts,nnext,"continuations");
00198 #endif
00199   }
00200   for (next=0; next<nnext; next+=SPLIT_BLOCK) {
00201 #ifdef TRACE_MSGS
00202     if (next==0)
00203       printf("go on to level %d\n",lev+1);
00204     else
00205       printf("next attempt at level %d\n",lev+1);
00206 #endif
00207     ierr = CanChain
00208       (lev+1,conts[next+1],N,splits,nsplits,chain,len+1,q,rar); CHKERRQ(ierr);
00209     if (rar) break;
00210   }
00211   PetscFunctionReturn(0);
00212 }
00213 
00214 #undef __FUNCT__
00215 #define __FUNCT__ "CondenseSplits"
00216 int CondenseSplits(int p,IS *splits)
00217 {
00218   IS newsplits; int lchain,*chain,ierr;
00219   PetscFunctionBegin;
00220   ierr = ISGetSize(*splits,&lchain); CHKERRQ(ierr);
00221   ierr = ISGetIndices(*splits,(const PetscInt**)&chain); CHKERRQ(ierr);
00222   ierr = CondenseChain(chain,lchain-1,p,&newsplits); CHKERRQ(ierr);
00223   ierr = ISRestoreIndices(*splits,(const PetscInt**)&chain); CHKERRQ(ierr);
00224   ierr = ISDestroy(*splits); CHKERRQ(ierr);
00225   *splits = newsplits;
00226   PetscFunctionReturn(0);
00227 }
00228 
00229 #undef __FUNCT__
00230 #define __FUNCT__ "ChainSplits"
00231 /* ChainSplits: recursively find all chains of split points
00232  * of at least the wanted length,
00233  * and yield up the highest quality one, suitably contracted.
00234  */
00235 static PetscErrorCode ChainSplits(int N,int *splits,int s_top,int p,IS *rar)
00236 {
00237   int *chain,ierr; IS isret;
00238 
00239   PetscFunctionBegin;
00240   ierr = PetscMalloc(s_top*sizeof(int),&chain); CHKERRQ(ierr);
00241   mq = 0.; ml = 0; nc = 0;
00242   ierr = CanChain(0,0,N,splits,s_top,chain,0,0,&isret); CHKERRQ(ierr);
00243   if (!isret) SETERRQ(1,"Did not deliver a chain");
00244   ierr = PetscFree(chain); CHKERRQ(ierr);
00245   if (p) {ierr = CondenseSplits(p,&isret); CHKERRQ(ierr);}
00246 #if TRACE_MSGS
00247   printf("final chain\n"); ISView(isret,0);
00248 #endif
00249   *rar = isret;
00250   PetscFunctionReturn(0);
00251 }
00252 
00253 #define VERY_FIRST_ROW (isFirst && isplit==0)
00254 #define VERY_LAST_ROW (isLast && isplit==nsplits-1)
00255 
00256 #undef __FUNCT__
00257 #define __FUNCT__ "GetUpBiDiagSplits"
00258 static PetscErrorCode GetUpBiDiagSplits
00259 (Mat A,int first,int last,int isFirst,int isLast,
00260  int **ar,int *n_ar,int **Ar,int *N_ar)
00261 {
00262   MPI_Comm comm; PetscTruth candidate;
00263   int *split_array,nsplits,*Split_array,Nsplits, local_size,row,ierr;
00264 
00265   PetscFunctionBegin;
00266   ierr = PetscObjectGetComm((PetscObject)A,&comm); CHKERRQ(ierr);
00267   local_size = last-first;
00268   ierr = PetscMalloc(local_size*sizeof(int),&split_array); CHKERRQ(ierr);
00269   nsplits = 0;
00270   if (isFirst) candidate=PETSC_TRUE;
00271   for (row=first; row<last; row++) {
00272     int ncols,icol; const int *cols; 
00273     ierr = MatGetRow(A,row,&ncols,&cols,PETSC_NULL); CHKERRQ(ierr);
00274     for (icol=0; icol<ncols; icol++) {
00275       if (cols[icol]==row) {
00276         if (candidate && icol<ncols-1 && cols[icol+1]==row+1)
00277           split_array[nsplits++] = row;
00278         candidate = TRUTH(icol==ncols-1 || cols[icol+1]>row+1);
00279         break;
00280       }
00281       if (cols[icol]>row) break;
00282     }
00283     ierr = MatRestoreRow(A,row,&ncols,&cols,PETSC_NULL); CHKERRQ(ierr);
00284   }
00285   if (isLast) split_array[nsplits++] = last;
00286 
00287   ierr = MPIAllGatherIntV
00288     (split_array,nsplits,&Split_array,&Nsplits,comm); CHKERRQ(ierr);
00289 #if TRACE_MSGS
00290   {
00291     int i;
00292     printf("local upbidiag splits: ");
00293     for (i=0; i<nsplits; i++) {printf("%d,",split_array[i]);}
00294     printf("\n");
00295     printf("global upbidiag splits: ");
00296     for (i=0; i<Nsplits; i++) {printf("%d,",Split_array[i+1]);}
00297     printf("\n");
00298   }
00299 #endif
00300 
00301   *ar = split_array; *n_ar = nsplits; *Ar = Split_array; *N_ar = Nsplits;
00302 
00303   PetscFunctionReturn(0);
00304 }
00305 
00306 #undef __FUNCT__
00307 #define __FUNCT__ "MatSplitPoints"
00308 int MatSplitPoints(Mat A,int np,IS *splitpoints)
00309 {
00310   MPI_Comm comm;
00311   int M,N,ntids,mytid, first,last, isFirst,isLast,
00312     nsplits,Nsplits,*split_array,*Split_array,
00313     *Splits,S_top,
00314     ierr;
00315 
00316   PetscFunctionBegin;
00317 
00318   ierr = PetscObjectGetComm((PetscObject)A,&comm); CHKERRQ(ierr);
00319   MPI_Comm_size(comm,&ntids); MPI_Comm_rank(comm,&mytid);
00320   isFirst = (mytid==0); isLast = (mytid==ntids-1);
00321   ierr = MatGetSize(A,&M,&N); CHKERRQ(ierr);
00322   ierr = MatGetOwnershipRange(A,&first,&last); CHKERRQ(ierr);
00323 
00324   ierr = GetUpBiDiagSplits
00325     (A,first,last,isFirst,isLast,
00326      &split_array,&nsplits,&Split_array,&Nsplits); CHKERRQ(ierr);
00327   Split_array++; /* zero location is size */
00328 
00329   {
00330     IS *isses,*jsses; Mat *subs;
00331     int *splits,r_top,s_top, isplit,jsplit,*jsplits,*joffsets,
00332       ntest,nsave,*status;
00333 
00334 #define SET_J j = Split_array[jsplit];
00335 #define SET_TS  ts = PetscMax(PetscMin(bs/10,i),1);
00336 #define SET_TSS \
00337       tsr = PetscMin(ts,N-j); tsd = PetscMin(ts,M-i); tsc = PetscMin(ts,M-i);
00338 #define SET_LAST_COL \
00339       ierr = MatGetRow(A,i,&ncols,&cols,PETSC_NULL); CHKERRQ(ierr); \
00340       lastcol = cols[ncols-1]; \
00341       ierr = MatRestoreRow(A,i,&ncols,&cols,PETSC_NULL); CHKERRQ(ierr);
00342 #define IF_TOO_FAR_RIGHT_BREAK if (j-ts>lastcol) break;
00343     /* 
00344      * what is the number of splits for each of the local splits?
00345      * (we only need this for efficient alloation of the "status" array;
00346      * straightforward allocation of nsplits*Nsplits is far too much.)
00347      */
00348     ierr = PetscMalloc(nsplits*sizeof(int),&joffsets); CHKERRQ(ierr);
00349     ierr = PetscMemzero(joffsets,nsplits*sizeof(int)); CHKERRQ(ierr);
00350     ierr = PetscMalloc((nsplits+1)*sizeof(int),&jsplits); CHKERRQ(ierr);
00351     ierr = PetscMemzero(jsplits,(nsplits+1)*sizeof(int)); CHKERRQ(ierr);
00352     jsplits[0] = 0;
00353     for (isplit=0; isplit<nsplits; isplit++) {
00354       int i = split_array[isplit],ncols,lastcol,njsplits=0; const int *cols;
00355       if (!VERY_LAST_ROW) {
00356         SET_LAST_COL;
00357         for (jsplit=0; jsplit<Nsplits; jsplit++) {
00358           int j,bs,ts;
00359           SET_J; bs = j-i; SET_TS;
00360           if (bs<=0) {joffsets[isplit]++; continue;}
00361           IF_TOO_FAR_RIGHT_BREAK;
00362           njsplits++; /* count how many splits we actually use */
00363         }
00364       }
00365       jsplits[isplit+1] = jsplits[isplit]+njsplits;
00366     }
00367     ierr = PetscMalloc(jsplits[nsplits]*sizeof(int),&status); CHKERRQ(ierr);
00368     ierr = PetscMemzero(status,jsplits[nsplits]*sizeof(int)); CHKERRQ(ierr);
00369 #define LEFTDOWN 1
00370 #define RIGHTDOWN 2
00371 #define LEFTUP 4
00372 #define RIGHTUP 8
00373 #define CORNERUP 16
00374 #define STATUS(isp,jsp) status[jsplits[isp]+(jsp-joffsets[isplit])]
00375 #define SET_STATUS(isp,jsp,s) STATUS(isp,jsp) += s
00376     ntest = 3*jsplits[nsplits];
00377 #if TRACE_MSGS
00378     printf("total # tests: %d\n",ntest);
00379 #endif
00380     ierr = PetscMalloc(ntest*sizeof(IS),&isses); CHKERRQ(ierr);
00381     ierr = PetscMalloc(ntest*sizeof(IS),&jsses); CHKERRQ(ierr);
00382 #define OVERFLOW_DOWN i+tsd-1>=last
00383     /* why did that say >=last-1 ? */
00384 #define OVERFLOW_UP i-ts<first
00385     ntest = 0; nsave = 0;
00386     for (isplit=0; isplit<nsplits; isplit++) {
00387       int i=split_array[isplit],ncols,lastcol;
00388       const int *cols; PetscTruth flag;
00389       if (VERY_LAST_ROW) break;
00390 #if TRACE_MSGS > 1
00391       printf("from split %d=>%d: ",isplit,i);
00392 #endif
00393       SET_LAST_COL;
00394       for (jsplit=joffsets[isplit]; jsplit<Nsplits; jsplit++) {
00395         IS left,right,up,down,corner; int j,bs,ts,tsr,tsd,tsc;
00396         SET_J; bs = j-i; SET_TS; SET_TSS;
00397         IF_TOO_FAR_RIGHT_BREAK;
00398 #if TRACE_MSGS > 1
00399         printf("split %d=>%d, ",jsplit,j);
00400 #endif
00401         if ((OVERFLOW_DOWN) || (OVERFLOW_UP)) {
00402           ierr = ISCreateStride(MPI_COMM_SELF,tsr,j,1,&right); CHKERRQ(ierr);
00403           ierr = ISCreateStride(MPI_COMM_SELF,ts,j-ts,1,&left); CHKERRQ(ierr);}
00404         if (!VERY_LAST_ROW) {
00405           if (OVERFLOW_DOWN) {
00406             /*printf("saving down i=%d j=%d\n",i,j);*/
00407             ierr = ISCreateStride(MPI_COMM_SELF,tsd,i,1,&down); CHKERRQ(ierr);
00408             isses[nsave] = left; jsses[nsave] = down; nsave++;
00409             isses[nsave] = right; jsses[nsave] = down; nsave++;
00410           } else {
00411             ierr = MatSubMatIsNull
00412               (A,i,i+tsd-1,j-ts,j-1,&flag); CHKERRQ(ierr);
00413 #if TRACE_MSGS > 1
00414             printf("submat (%d,%d) x (%d,%d) : n=%d,  ",
00415                    i,i+tsd-1,j-ts,j-1,flag);
00416 #endif
00417             SET_STATUS(isplit,jsplit,flag*LEFTDOWN);
00418             ierr = MatSubMatIsNull
00419               (A,i,i+tsd-1,j,j+tsr-1,&flag); CHKERRQ(ierr);
00420 #if TRACE_MSGS > 1
00421             printf("submat (%d,%d) x (%d,%d) : n=%d,  ",
00422                    i,i+tsd-1,j,j+tsr-1,flag);
00423 #endif
00424             SET_STATUS(isplit,jsplit,flag*RIGHTDOWN);
00425           }
00426           ntest += 2;
00427         }
00428         if (!VERY_FIRST_ROW) {
00429           if (OVERFLOW_UP) {
00430             ierr = ISCreateStride(MPI_COMM_SELF,ts,i-ts,1,&up); CHKERRQ(ierr);
00431             ierr = ISCreateStride(MPI_COMM_SELF,tsc,i,1,&corner); CHKERRQ(ierr);
00432             /*printf("saving up i=%d j=%d\n",i,j);*/
00433             isses[nsave] = left; jsses[nsave] = up; nsave++;
00434             isses[nsave] = right; jsses[nsave] = up; nsave++;
00435             isses[nsave] = corner; jsses[nsave] = up; nsave++;
00436           } else {
00437             ierr = MatSubMatIsNull
00438               (A,i-ts,i-1,j-ts-1,j-1,&flag); CHKERRQ(ierr);
00439 #if TRACE_MSGS > 1
00440             printf("submat (%d,%d) x (%d,%d) : n=%d,  ",
00441                    i-ts,i-1,j-ts-1,j-1,flag);
00442 #endif
00443             SET_STATUS(isplit,jsplit,flag*LEFTUP);
00444             ierr = MatSubMatIsNull
00445               (A,i-ts,i-1,j,j+tsr-1,&flag); CHKERRQ(ierr);
00446             SET_STATUS(isplit,jsplit,flag*RIGHTUP);
00447             ierr = MatSubMatIsNull
00448               (A,i-ts,i-1,i,i+tsc-1,&flag); CHKERRQ(ierr);
00449             SET_STATUS(isplit,jsplit,flag*CORNERUP);
00450           }
00451           ntest += 3;
00452         }
00453       }
00454 #if TRACE_MSGS > 1
00455       printf("\n\n");
00456 #endif
00457     }
00458 
00459 #if TRACE_MSGS
00460     printf("saved %d straddling matrices out of %d tests\n",nsave,ntest);
00461 #endif
00462     ierr = MatGetSubMatrices
00463       (A,nsave,isses,jsses,MAT_INITIAL_MATRIX,&subs); CHKERRQ(ierr);
00464 
00465     ierr = PetscMalloc(SPLIT_BLOCK*ntest*sizeof(int),&splits); CHKERRQ(ierr);
00466     ntest = 0; r_top = 0; s_top = 0;
00467     for (isplit=0; isplit<nsplits; isplit++) {
00468       int i=split_array[isplit],ncols,lastcol,restart=1,split=1,c=0;
00469       const int *cols;
00470       if (VERY_LAST_ROW) break;
00471 #if TRACE_MSGS > 1
00472       printf("from split %d=>%d: ",isplit,i);
00473 #endif
00474       SET_LAST_COL;
00475       for (jsplit=joffsets[isplit]; jsplit<Nsplits; jsplit++) {
00476         int j,bs,ts,tsr,tsd,tsc,rank=0;
00477         SET_J; bs = j-i; SET_TS; SET_TSS;
00478 #if TRACE_MSGS > 1
00479         if (j-ts>lastcol) printf("%d:a ",j);
00480 #endif
00481         IF_TOO_FAR_RIGHT_BREAK;
00482         if (!VERY_LAST_ROW) {
00483           Mat leftdown,rightdown; int flag_leftdown,flag_rightdown;
00484           if (OVERFLOW_DOWN) {
00485             leftdown = subs[ntest++]; rightdown = subs[ntest++];
00486             ierr = MatIsNull(leftdown,&flag_leftdown); CHKERRQ(ierr);
00487             SET_STATUS(isplit,jsplit,flag_leftdown*LEFTDOWN);
00488             ierr = MatIsNull(rightdown,&flag_rightdown); CHKERRQ(ierr);
00489             SET_STATUS(isplit,jsplit,flag_rightdown*RIGHTDOWN);
00490           } else {
00491             flag_leftdown = STATUS(isplit,jsplit) & LEFTDOWN;
00492             flag_rightdown = STATUS(isplit,jsplit) & RIGHTDOWN;
00493           }
00494           restart = flag_leftdown && (!flag_rightdown || j==N);
00495 #if TRACE_MSGS > 1
00496           if (restart) printf("%d:dn:r, ",j); else printf("%d:dn:x, ",j);
00497 #endif
00498           if (restart) rank=1;
00499         }
00500         if (VERY_FIRST_ROW) {
00501           if (rank) rank=3;
00502         } else {
00503           Mat leftup,rightup,corner; int flag_leftup,flag_rightup,flag_corner;
00504           if (OVERFLOW_UP) {
00505             leftup = subs[ntest++]; rightup = subs[ntest++];
00506             ierr = MatIsNull(leftup,&flag_leftup); CHKERRQ(ierr);
00507             ierr = MatIsNull(rightup,&flag_rightup); CHKERRQ(ierr);
00508             corner = subs[ntest++];
00509             ierr = MatIsNull(corner,&flag_corner); CHKERRQ(ierr);
00510           } else {
00511             flag_leftup = STATUS(isplit,jsplit) & LEFTUP;
00512             flag_rightup = STATUS(isplit,jsplit) & RIGHTUP;
00513             flag_corner = STATUS(isplit,jsplit) & CORNERUP;
00514           }
00515           split = (flag_rightup || j==N) && !flag_leftup;
00516 #if TRACE_MSGS > 1
00517           if (split) printf("%d:up:s, ",j); else printf("%d:up:x, ",j);
00518 #endif
00519           if (rank && split) rank=2;
00520           if (rank==2 && flag_corner) rank=3;
00521         }
00522         if (rank>0) {
00523           c++;
00524           splits[s_top] = i; splits[s_top+1] = j; splits[s_top+2] = rank;
00525           s_top+=SPLIT_BLOCK;}
00526       }
00527       if (!c) { /* append next split as default case */
00528         splits[s_top] = i; splits[s_top+1] = split_array[isplit+1];
00529         splits[s_top+2] = 1; s_top+=SPLIT_BLOCK;}
00530 #if TRACE_MSGS > 1
00531       printf("\n");
00532 #endif
00533     }
00534     splits[s_top] = splits[s_top-SPLIT_BLOCK+1]; s_top++;
00535     splits[s_top++] = N; splits[s_top++] = 1;
00536 #if TRACE_MSGS > 1
00537     {
00538       int i;
00539       printf("local final splits: ");
00540       for (i=0; i<s_top; i+=SPLIT_BLOCK) {
00541         printf("[%d,%d],",splits[i],splits[i+1]);}
00542       printf("\n");}
00543 #endif
00544     ierr = MPIAllGatherIntV(splits,s_top,&Splits,&S_top,comm); CHKERRQ(ierr);
00545     Splits++;
00546 #ifdef TRACE_MSGS
00547     if (mytid==0) {
00548       int i;
00549       printf("global final splits: ");
00550       for (i=0; i<S_top; i+=SPLIT_BLOCK) {
00551         printf("[%d,%d:%d],",Splits[i],Splits[i+1],Splits[i+2]);
00552       }
00553       printf("\n");}
00554 #endif
00555     ierr = PetscFree(splits); CHKERRQ(ierr);
00556     ierr = PetscFree(joffsets); CHKERRQ(ierr);
00557     ierr = PetscFree(jsplits); CHKERRQ(ierr);
00558 
00559     for (isplit=0; isplit<nsave; isplit++) {
00560       ierr = ISDestroy(isses[isplit]); CHKERRQ(ierr);
00561       if (isplit>0 && (jsses[isplit]!=jsses[isplit-1])) {
00562         ierr = ISDestroy(jsses[isplit]); CHKERRQ(ierr);
00563       }
00564     }
00565     PetscFree(isses); PetscFree(jsses);
00566 
00567     ierr = MatDestroyMatrices(ntest,&subs); CHKERRQ(ierr); /*also frees subs*/
00568   }
00569   {
00570     IS isret;
00571     ierr = ChainSplits(N,Splits,S_top,np,&isret); CHKERRQ(ierr);
00572     *splitpoints = isret;
00573   }
00574   Splits--;
00575   ierr = PetscFree(Splits); CHKERRQ(ierr);
00576 
00577   PetscFunctionReturn(0);
00578 }
00579 
00580 #undef __FUNCT__
00581 #define __FUNCT__ "MatRedistribute"
00582 int MatRedistribute(Mat *A,IS splits)
00583 {
00584   MPI_Comm comm; Mat B;
00585   int ntids,mytid,local_size,ierr;
00586 
00587   PetscFunctionBegin;
00588   ierr = PetscObjectGetComm((PetscObject)*A,&comm); CHKERRQ(ierr);
00589   MPI_Comm_size(comm,&ntids); MPI_Comm_rank(comm,&mytid);
00590   {
00591     /* find the improved distribution, return the new local_size */
00592     int *indices,chck;
00593     ierr = ISGetLocalSize(splits,&chck); CHKERRQ(ierr);
00594     if (chck!=ntids+1) SETERRQ(1,"Set of split points wrong length");
00595     ierr = ISGetIndices(splits,(const PetscInt**)&indices); CHKERRQ(ierr);
00596     local_size = indices[mytid+1]-indices[mytid];
00597     ierr = ISRestoreIndices(splits,(const PetscInt**)&indices); CHKERRQ(ierr);
00598   }
00599   {
00600     /* redistribute the matrix, unless the distribution is already as wanted */
00601     int perfect,gperfect,first,last,row;
00602     ierr = MatGetOwnershipRange(*A,&first,&last); CHKERRQ(ierr);
00603     perfect = local_size==(last-first);
00604     MPI_Allreduce(&perfect,&gperfect,1,MPI_INT,MPI_PROD,comm);
00605     if (gperfect) PetscFunctionReturn(0);
00606     ierr = MatCreateMPIAIJ /* optimise the d_ and o_ parameters ! */
00607       (comm,local_size,local_size,PETSC_DECIDE,PETSC_DECIDE,
00608        5,0,5,0,&B); CHKERRQ(ierr);
00609     for (row=first; row<last; row++) {
00610       int ncols; const int *cols; const PetscScalar *vals;
00611       ierr = MatGetRow(*A,row,&ncols,&cols,&vals); CHKERRQ(ierr);
00612       ierr = MatSetValues
00613         (B,1,&row,ncols,cols,vals,INSERT_VALUES); CHKERRQ(ierr);
00614       ierr = MatRestoreRow(*A,row,&ncols,&cols,&vals); CHKERRQ(ierr);
00615     }
00616     ierr = MatAssemblyBegin(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
00617     ierr = MatAssemblyEnd(B,MAT_FINAL_ASSEMBLY); CHKERRQ(ierr);
00618   }
00619   ierr = MatDestroy(*A); CHKERRQ(ierr);
00620   *A = B;
00621   PetscFunctionReturn(0);
00622 }
00623 int VecRedistribute(Vec *V,IS splits)
00624 {
00625   MPI_Comm comm; Vec W;
00626   int ntids,mytid,local_size,new_first,ierr;
00627 
00628   PetscFunctionBegin;
00629   ierr = PetscObjectGetComm((PetscObject)*V,&comm); CHKERRQ(ierr);
00630   MPI_Comm_size(comm,&ntids); MPI_Comm_rank(comm,&mytid);
00631   {
00632     /* find the new local_size */
00633     int *indices;
00634     ierr = ISGetIndices(splits,(const PetscInt**)&indices); CHKERRQ(ierr);
00635     local_size = indices[mytid+1]-indices[mytid]; new_first = indices[mytid];
00636     ierr = ISRestoreIndices(splits,(const PetscInt**)&indices); CHKERRQ(ierr);
00637   }
00638   {
00639     /* redistribute the matrix, unless the distribution is already as wanted */
00640     int perfect,gperfect,first,last;
00641     ierr = VecGetOwnershipRange(*V,&first,&last); CHKERRQ(ierr);
00642     perfect = local_size==(last-first);
00643     MPI_Allreduce(&perfect,&gperfect,1,MPI_INT,MPI_PROD,comm);
00644     if (gperfect) PetscFunctionReturn(0);
00645     ierr = VecCreateMPI
00646       (comm,local_size,PETSC_DECIDE,&W); CHKERRQ(ierr);
00647     {
00648       PetscScalar *elements; int *range,i;
00649       ierr = VecGetArray(*V,&elements); CHKERRQ(ierr);
00650       ierr = PetscMalloc(local_size*sizeof(int),&range); CHKERRQ(ierr);
00651       for (i=first; i<last; i++) range[i-first] = i;
00652       ierr = VecSetValues
00653         (W,last-first,range,elements,INSERT_VALUES); CHKERRQ(ierr);
00654       ierr = PetscFree(range); CHKERRQ(ierr);
00655       ierr = VecRestoreArray(*V,&elements); CHKERRQ(ierr);
00656     }
00657     ierr = VecAssemblyBegin(W); CHKERRQ(ierr);
00658     ierr = VecAssemblyEnd(W); CHKERRQ(ierr);
00659   }
00660   ierr = VecDestroy(*V); CHKERRQ(ierr);
00661   *V = W;
00662 
00663   PetscFunctionReturn(0);
00664 }
00665 
00666 /*
00667  * Splits
00668  */
00669 #undef __FUNCT__
00670 #define __FUNCT__ "compute_icm_splits"
00671 static PetscErrorCode compute_icm_splits(Mat A)
00672 {
00673   MPI_Comm comm; IS splits; int *indices,ns,*ss,id,ntids; 
00674   PetscTruth has; PetscErrorCode ierr;
00675   PetscFunctionBegin;
00676 
00677   ierr = PetscObjectGetComm((PetscObject)A,&comm); CHKERRQ(ierr);
00678   ierr = MPI_Comm_size(comm,&ntids); CHKERRQ(ierr);
00679   ierr = MatSplitPoints(A,0,&splits); CHKERRQ(ierr);
00680 
00681   ierr = ISGetSize(splits,&ns); CHKERRQ(ierr);
00682   ierr = ISGetIndices(splits,(const PetscInt**)&indices); CHKERRQ(ierr);
00683   ierr = PetscMalloc((ns+1)*sizeof(int),&ss); CHKERRQ(ierr);
00684   ierr = PetscMemcpy(ss+1,indices,ns*sizeof(int)); CHKERRQ(ierr);
00685   ss[0] = ns;
00686   ierr = ISRestoreIndices(splits,(const PetscInt**)&indices); CHKERRQ(ierr);
00687   ierr = ISDestroy(splits); CHKERRQ(ierr);
00688 
00689   ierr = GetDataID("icmk","n-splits",&id,&has); CHKERRQ(ierr);
00690   HASTOEXIST(has);
00691   ierr = PetscObjectComposedDataSetInt
00692     ((PetscObject)A,id,ns); CHKERRQ(ierr);
00693   ierr = GetDataID("icmk","splits",&id,&has); CHKERRQ(ierr);
00694   HASTOEXIST(has);
00695   ierr = PetscObjectComposedDataSetIntstar
00696     ((PetscObject)A,id,ss); CHKERRQ(ierr);
00697 
00698   PetscFunctionReturn(0);
00699 }
00700 
00701 #undef __FUNCT__
00702 #define __FUNCT__ "NSplits"
00703 static PetscErrorCode NSplits
00704 (AnaModNumericalProblem prob,AnalysisItem *rv,int *dummy,PetscTruth *flg)
00705 {
00706   Mat A = (Mat)prob;
00707   int v; PetscTruth has; int id; PetscErrorCode ierr;
00708   PetscFunctionBegin;
00709   ierr = GetDataID("icmk","n-splits",&id,&has); CHKERRQ(ierr);
00710   HASTOEXIST(has);
00711   ierr = PetscObjectComposedDataGetInt
00712     ((PetscObject)A,id,v,*flg); CHKERRQ(ierr);
00713   if (*flg) rv->i = v;
00714   else {
00715     ierr = compute_icm_splits(A); CHKERRQ(ierr);
00716     ierr = PetscObjectComposedDataGetInt
00717       ((PetscObject)A,id,v,*flg); CHKERRQ(ierr);
00718     if (*flg) rv->i = v;
00719   }
00720   
00721   PetscFunctionReturn(0);
00722 }
00723 
00724 #undef __FUNCT__
00725 #define __FUNCT__ "Splits"
00726 static PetscErrorCode Splits
00727 (AnaModNumericalProblem prob,AnalysisItem *rv,int *dummy, PetscTruth *flg)
00728 {
00729   Mat A = (Mat)prob;
00730   int *v; PetscTruth has; int id; PetscErrorCode ierr;
00731   PetscFunctionBegin;
00732   ierr = GetDataID("icmk","splits",&id,&has); CHKERRQ(ierr);
00733   HASTOEXIST(has);
00734   ierr = PetscObjectComposedDataGetIntstar
00735     ((PetscObject)A,id,v,*flg); CHKERRQ(ierr);
00736   if (*flg) rv->ii = v;
00737   else {
00738     ierr = compute_icm_splits(A); CHKERRQ(ierr);
00739     ierr = PetscObjectComposedDataGetIntstar
00740       ((PetscObject)A,id,v,*flg); CHKERRQ(ierr);
00741     if (*flg) rv->ii = v;
00742   }
00743   
00744   PetscFunctionReturn(0);
00745 }
00746 
00747 #undef __FUNCT__
00748 #define __FUNCT__ "RegisterICMKModules"
00749 PetscErrorCode RegisterICMKModules(void)
00750 {
00751   PetscErrorCode ierr;
00752   PetscFunctionBegin;
00753 
00754   ierr = RegisterModule
00755     ("icmk","n-splits",ANALYSISINTEGER,&NSplits); CHKERRQ(ierr);
00756   ierr = RegisterModule
00757     ("icmk","splits",ANALYSISINTARRAY,&Splits); CHKERRQ(ierr);
00758 
00759   PetscFunctionReturn(0);
00760 }