使用MPI在C中发送二维数组块

你如何发送二维数组块到不同的处理器? 假设二维数组大小为400×400,我想发送100X100大小的块到不同的处理器。 这个想法是,每个处理器将在其单独的块上执行计算,并将其结果发送回第一个处理器以得到最终结果。
我在C程序中使用MPI。

首先让我说你一般不想这么做 – 从一些“主”过程中分散和收集大量的数据。 通常情况下,您希望每个任务都能摆脱自己的难题,并且您应该致力于永远不要让一个处理器需要全局数据的“全局视图” 只要你需要,你限制了可扩展性和问题的大小。 如果你正在为I / O做这件事 – 一个进程读取数据,然后分散它,然后把它收集起来写入,你最终会想看看MPI-IO。

但是,解决你的问题,MPI有非常好的方式来将任意数据从内存中取出,并将其分散/收集到一组处理器中。 不幸的是,这需要相当数量的MPI概念 – MPI类型,范围和集体操作。 MPI_Type_create_subarray和MPI_Gather在这个问题的答案中讨论了很多基本思想。

更新 – 在寒冷的白天,这是很多的代码,而不是很多的解释。 所以让我扩大一点。

考虑一个1d的整数全局数组,任务0拥有你想要分配给多个MPI任务的权限,这样他们每个人都可以在本地数组中获得一块。 假设你有4个任务,全局数组是[01234567] 。 你可以让任务0发送四条消息(包括一条本身)来分发这个消息,当它重新组装时,接收四条消息将它们捆绑在一起; 但是在大量的流程中显然会非常耗时。 有这些操作的优化例程 – 分散/收集操作。 所以在这个案例中,你可以这样做:

 int global[8]; /* only task 0 has this */ int local[2]; /* everyone has this */ const int root = 0; /* the processor with the initial global data */ if (rank == root) { for (int i=0; i<7; i++) global[i] = i; } MPI_Scatter(global, 2, MPI_INT, /* send everyone 2 ints from global */ local, 2, MPI_INT, /* each proc receives 2 ints into local */ root, MPI_COMM_WORLD); /* sending process is root, all procs in */ /* MPI_COMM_WORLD participate */ 

之后,处理器的数据看起来就像

 task 0: local:[01] global: [01234567] task 1: local:[23] global: [garbage-] task 2: local:[45] global: [garbage-] task 3: local:[67] global: [garbage-] 

也就是说,分散操作采用全局数组并向所有处理器发送连续的2-int块。

为了重新组装数组,我们使用MPI_Gather()操作,该操作完全相同,但是相反:

 for (int i=0; i<2; i++) local[i] = local[i] + rank; MPI_Gather(local, 2, MPI_INT, /* everyone sends 2 ints from local */ global, 2, MPI_INT, /* root receives 2 ints each proc into global */ root, MPI_COMM_WORLD); /* recv'ing process is root, all procs in */ /* MPI_COMM_WORLD participate */ 

现在的数据看起来像

 task 0: local:[01] global: [0134679a] task 1: local:[34] global: [garbage-] task 2: local:[67] global: [garbage-] task 3: local:[9a] global: [garbage-] 

Gather把所有的数据都带回来了,这里是10,因为在开始这个例子的时候,我并没有仔细考虑过我的格式。

如果数据点的数量没有平均分配进程数量,我们需要向每个进程发送不同数量的项目,会发生什么情况? 然后你需要一个通用的分散版本MPI_Scatterv() ,它可以让你指定每个处理器的计数,以及位移 – 在全局数组中启动数据的位置。 假设你有一个由9个字符组成的字符数组[abcdefghi] ,你将为每个进程分配两个字符,除了最后一个字符以外,有三个字符。 那么你需要

 char global[9]; /* only task 0 has this */ char local[3]={'-','-','-'}; /* everyone has this */ int mynum; /* how many items */ const int root = 0; /* the processor with the initial global data */ if (rank == 0) { for (int i=0; i<8; i++) global[i] = 'a'+i; } int counts[4] = {2,2,2,3}; /* how many pieces of data everyone has */ mynum = counts[rank]; int displs[4] = {0,2,4,6}; /* the starting point of everyone's data */ /* in the global array */ MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] pts from displs[i] */ MPI_INT, local, mynum, MPI_INT; /* I'm receiving mynum MPI_INTs into local */ root, MPI_COMM_WORLD); 

现在数据看起来像

 task 0: local:[ab-] global: [abcdefghi] task 1: local:[cd-] global: [garbage--] task 2: local:[ef-] global: [garbage--] task 3: local:[ghi] global: [garbage--] 

您现在已经使用scatterv来分配不规则的数据量。 每种情况下的位移是从数组开始的两个*等级(以字符测量的;位移以散列发送的类型或聚集接收的类型的单位为单位,通常不以字节为单位),而计数是{2,2,2,3}。 如果它是我们想要的3个字符的第一个处理器,我们可以设置计数= {3,2,2,2},位移将是{0,3,5,7}。 Gatherv再次作品完全相同,但相反; 计数和显示数组将保持不变。

现在,对于2D来说,这有点棘手。 如果我们想发送2d数组的2d个子块,我们现在发送的数据不再是连续的。 如果我们发送(比如说)一个6×6阵列的3×3子块到4个处理器,我们发送的数据就有了漏洞:

 2D Array --------- |000|111| |000|111| |000|111| |---+---| |222|333| |222|333| |222|333| --------- Actual layout in memory [000111000111000111222333222333222333] 

(请注意,所有高性能计算归结为了解内存中数据的布局。)

如果我们想要将标记为“1”的数据发送给任务1,则需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值。 第二个复杂情况是分区域停止和开始的地方; 注意区域“1”在区域“0”停止的地方不开始; 在区域“0”的最后一个元素之后,存储器中的下一个位置是通过区域“1”的中途。

我们首先解决第一个布局问题 – 如何提取我们想要发送的数据。 我们总是可以将所有的“0”区域数据复制到另一个连续的数组中,然后发送; 如果我们仔细地计划好了,我们甚至可以这样做,我们可以调用MPI_Scatter来获得结果。 但是我们宁愿不必以这种方式转换整个主数据结构。

到目前为止,我们使用的所有MPI数据类型都是简单的–MPI_INT指定(假设)连续4个字节。 但是,MPI允许您创建自己的数据类型,用于描述内存中任意复杂的数据布局。 而这种情况下 – 矩阵的矩形子区域 – 已经足够普遍,有一个特定的要求。 对于上面描述的二维情况,

  MPI_Datatype newtype; int sizes[2] = {6,6}; /* size of global array */ int subsizes[2] = {3,3}; /* size of sub-region */ int starts[2] = {0,0}; /* let's say we're looking at region "0", which begins at index [0,0] */ MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &newtype); MPI_Type_commit(&newtype); 

这将创建一个从全局数组中仅挑选出区域“0”的类型; 我们现在可以把这一块数据发送给另一个处理器

  MPI_Send(&(global[0][0]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "0" */ 

并且接收进程可以将其接收到本地数组中。 请注意,接收过程,如果它只接收到一个3×3数组, 不能描述它接收的是一种新类型; 不再描述内存布局。 相反,它只是接收一个3 * 3 = 9整数的块:

  MPI_Recv(&(local[0][0]), 3*3, MPI_INT, 0, tag, MPI_COMM_WORLD); 

请注意,我们也可以为其他子区域执行此操作,可以通过为其他块创建不同的类型(具有不同的start数组),或者通过在特定块的起始位置发送:

  MPI_Send(&(global[0][3]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "1" */ MPI_Send(&(global[3][0]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "2" */ MPI_Send(&(global[3][3]), 1, newtype, dest, tag, MPI_COMM_WORLD); /* region "3" */ 

最后,请注意,我们要求全球和本地在这里是连续的记忆块; 也就是&(global[0][0])&(local[0][0]) (或者等价地, *global*local指向连续的6 * 6和3 * 3的内存块;这是通过分配动态多维数组的常用方式来保证的,下面将介绍如何执行此操作。

现在我们已经理解了如何指定子区域,在使用分散/聚集操作之前还有一件事要讨论,那就是这些类型的“大小”。 我们不能仅仅使用这些类型的MPI_Scatter() (或者甚至是scatterv),因为这些类型具有16个整数的范围; 也就是说,在它们开始之后它们结束的位置是16个整数,并且它们结束的位置与下一个块的开始位置没有很好的对齐,所以我们不能只使用分散 – 它会选择错误的地方开始发送数据到下一个处理器。

当然,我们可以使用MPI_Scatterv()指定自己的位移,这就是我们要做的 – 除了位移是以发送类型为单位的,这对我们也没有帮助。 块从(0,3,18,21)个整数偏移量开始,从全局数组开始,块从这个数组开始处结束16个整数的事实并不让我们表达那些整数倍的位移。

为了解决这个问题,MPI可以让你为这些计算的目的设置类型的范围。 它不会截断类型; 它只是用来确定下一个元素开始给出最后一个元素的位置。 对于像这些孔中的类型,将范围设置为比内存中的距离小到实际类型的结尾通常比较方便。

我们可以将程度设定为对我们来说很方便的任何事情。 我们可以将范围1整数,然后以整数为单位设置位移。 在这种情况下,我喜欢将范围设置为3个整数 – 子行的大小 – 这样,块“1”立即在块“0”之后开始,并且块“3”在块“ 2" 。 不幸的是,当从块“2”跳到块“3”时,它不是很好地工作,但这不能被帮助。

因此,在这种情况下分散子块,我们会做以下几点:

  MPI_Datatype type, resizedtype; int sizes[2] = {6,6}; /* size of global array */ int subsizes[2] = {3,3}; /* size of sub-region */ int starts[2] = {0,0}; /* let's say we're looking at region "0", which begins at index [0,0] */ /* as before */ MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type); /* change the extent of the type */ MPI_Type_create_resized(type, 0, 3*sizeof(int), &resizedtype); MPI_Type_commit(&resizedtype); 

这里我们已经创建了和前面一样的块类型,但是我们调整了它的大小。 我们没有改变类型“开始”(0)的位置,但是我们改变了“结束”(3个整数)的位置。 我们之前没有提到这个,但是MPI_Type_commit需要能够使用这个类型; 但是您只需要提交实际使用的最终类型,而不是任何中间步骤。 完成后,您可以使用MPI_Type_free来释放该类型。

所以现在最后,我们可以分散块:上面的数据操作有点复杂,但一旦完成,scatterv看起来就像以前一样:

 int counts[4] = {1,1,1,1}; /* how many pieces of data everyone has, in units of blocks */ int displs[4] = {0,1,6,7}; /* the starting point of everyone's data */ /* in the global array, in block extents */ MPI_Scatterv(global, counts, displs, /* proc i gets counts[i] types from displs[i] */ resizedtype, local, 3*3, MPI_INT; /* I'm receiving 3*3 MPI_INTs into local */ root, MPI_COMM_WORLD); 

现在我们完成了,分散,聚集和MPI派生类型的一些小游览之后。

下面是一个示例代码,其中显示了收集和分散操作以及字符数组。 运行程序:

 $ mpirun -n 4 ./gathervarray Global array is: 0123456789 3456789012 6789012345 9012345678 2345678901 5678901234 8901234567 1234567890 4567890123 7890123456 Local process on rank 0 is: |01234| |34567| |67890| |90123| |23456| Local process on rank 1 is: |56789| |89012| |12345| |45678| |78901| Local process on rank 2 is: |56789| |89012| |12345| |45678| |78901| Local process on rank 3 is: |01234| |34567| |67890| |90123| |23456| Processed grid: AAAAABBBBB AAAAABBBBB AAAAABBBBB AAAAABBBBB AAAAABBBBB CCCCCDDDDD CCCCCDDDDD CCCCCDDDDD CCCCCDDDDD CCCCCDDDDD 

代码如下。

 #include <stdio.h> #include <math.h> #include <stdlib.h> #include "mpi.h" int malloc2dchar(char ***array, int n, int m) { /* allocate the n*m contiguous items */ char *p = (char *)malloc(n*m*sizeof(char)); if (!p) return -1; /* allocate the row pointers into the memory */ (*array) = (char **)malloc(n*sizeof(char*)); if (!(*array)) { free(p); return -1; } /* set up the pointers into the contiguous memory */ for (int i=0; i<n; i++) (*array)[i] = &(p[i*m]); return 0; } int free2dchar(char ***array) { /* free the memory - the first element of the array is at the start */ free(&((*array)[0][0])); /* free the pointers into the memory */ free(*array); return 0; } int main(int argc, char **argv) { char **global, **local; const int gridsize=10; // size of grid const int procgridsize=2; // size of process grid int rank, size; // rank of current process and no. of processes MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (size != procgridsize*procgridsize) { fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize); MPI_Abort(MPI_COMM_WORLD,1); } if (rank == 0) { /* fill in the array, and print it */ malloc2dchar(&global, gridsize, gridsize); for (int i=0; i<gridsize; i++) { for (int j=0; j<gridsize; j++) global[i][j] = '0'+(3*i+j)%10; } printf("Global array is:\n"); for (int i=0; i<gridsize; i++) { for (int j=0; j<gridsize; j++) putchar(global[i][j]); printf("\n"); } } /* create the local array which we'll process */ malloc2dchar(&local, gridsize/procgridsize, gridsize/procgridsize); /* create a datatype to describe the subarrays of the global array */ int sizes[2] = {gridsize, gridsize}; /* global size */ int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize}; /* local size */ int starts[2] = {0,0}; /* where this one starts */ MPI_Datatype type, subarrtype; MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_CHAR, &type); MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(char), &subarrtype); MPI_Type_commit(&subarrtype); char *globalptr=NULL; if (rank == 0) globalptr = &(global[0][0]); /* scatter the array to all processors */ int sendcounts[procgridsize*procgridsize]; int displs[procgridsize*procgridsize]; if (rank == 0) { for (int i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1; int disp = 0; for (int i=0; i<procgridsize; i++) { for (int j=0; j<procgridsize; j++) { displs[i*procgridsize+j] = disp; disp += 1; } disp += ((gridsize/procgridsize)-1)*procgridsize; } } MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR, 0, MPI_COMM_WORLD); /* now all processors print their local data: */ for (int p=0; p<size; p++) { if (rank == p) { printf("Local process on rank %d is:\n", rank); for (int i=0; i<gridsize/procgridsize; i++) { putchar('|'); for (int j=0; j<gridsize/procgridsize; j++) { putchar(local[i][j]); } printf("|\n"); } } MPI_Barrier(MPI_COMM_WORLD); } /* now each processor has its local array, and can process it */ for (int i=0; i<gridsize/procgridsize; i++) { for (int j=0; j<gridsize/procgridsize; j++) { local[i][j] = 'A' + rank; } } /* it all goes back to process 0 */ MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_CHAR, globalptr, sendcounts, displs, subarrtype, 0, MPI_COMM_WORLD); /* don't need the local data anymore */ free2dchar(&local); /* or the MPI data type */ MPI_Type_free(&subarrtype); if (rank == 0) { printf("Processed grid:\n"); for (int i=0; i<gridsize; i++) { for (int j=0; j<gridsize; j++) { putchar(global[i][j]); } printf("\n"); } free2dchar(&global); } MPI_Finalize(); return 0; } 

我只是觉得更容易检查这种方式。

 #include <stdio.h> #include <math.h> #include <stdlib.h> #include "mpi.h" /* This is a version with integers, rather than char arrays, presented in this very good answer: http://stackoverflow.com/a/9271753/2411320 It will initialize the 2D array, scatter it, increase every value by 1 and then gather it back. */ int malloc2D(int ***array, int n, int m) { int i; /* allocate the n*m contiguous items */ int *p = malloc(n*m*sizeof(int)); if (!p) return -1; /* allocate the row pointers into the memory */ (*array) = malloc(n*sizeof(int*)); if (!(*array)) { free(p); return -1; } /* set up the pointers into the contiguous memory */ for (i=0; i<n; i++) (*array)[i] = &(p[i*m]); return 0; } int free2D(int ***array) { /* free the memory - the first element of the array is at the start */ free(&((*array)[0][0])); /* free the pointers into the memory */ free(*array); return 0; } int main(int argc, char **argv) { int **global, **local; const int gridsize=4; // size of grid const int procgridsize=2; // size of process grid int rank, size; // rank of current process and no. of processes int i, j, p; MPI_Init(&argc, &argv); MPI_Comm_size(MPI_COMM_WORLD, &size); MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (size != procgridsize*procgridsize) { fprintf(stderr,"%s: Only works with np=%d for now\n", argv[0], procgridsize); MPI_Abort(MPI_COMM_WORLD,1); } if (rank == 0) { /* fill in the array, and print it */ malloc2D(&global, gridsize, gridsize); int counter = 0; for (i=0; i<gridsize; i++) { for (j=0; j<gridsize; j++) global[i][j] = ++counter; } printf("Global array is:\n"); for (i=0; i<gridsize; i++) { for (j=0; j<gridsize; j++) { printf("%2d ", global[i][j]); } printf("\n"); } } //return; /* create the local array which we'll process */ malloc2D(&local, gridsize/procgridsize, gridsize/procgridsize); /* create a datatype to describe the subarrays of the global array */ int sizes[2] = {gridsize, gridsize}; /* global size */ int subsizes[2] = {gridsize/procgridsize, gridsize/procgridsize}; /* local size */ int starts[2] = {0,0}; /* where this one starts */ MPI_Datatype type, subarrtype; MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_C, MPI_INT, &type); MPI_Type_create_resized(type, 0, gridsize/procgridsize*sizeof(int), &subarrtype); MPI_Type_commit(&subarrtype); int *globalptr=NULL; if (rank == 0) globalptr = &(global[0][0]); /* scatter the array to all processors */ int sendcounts[procgridsize*procgridsize]; int displs[procgridsize*procgridsize]; if (rank == 0) { for (i=0; i<procgridsize*procgridsize; i++) sendcounts[i] = 1; int disp = 0; for (i=0; i<procgridsize; i++) { for (j=0; j<procgridsize; j++) { displs[i*procgridsize+j] = disp; disp += 1; } disp += ((gridsize/procgridsize)-1)*procgridsize; } } MPI_Scatterv(globalptr, sendcounts, displs, subarrtype, &(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_INT, 0, MPI_COMM_WORLD); /* now all processors print their local data: */ for (p=0; p<size; p++) { if (rank == p) { printf("Local process on rank %d is:\n", rank); for (i=0; i<gridsize/procgridsize; i++) { putchar('|'); for (j=0; j<gridsize/procgridsize; j++) { printf("%2d ", local[i][j]); } printf("|\n"); } } MPI_Barrier(MPI_COMM_WORLD); } /* now each processor has its local array, and can process it */ for (i=0; i<gridsize/procgridsize; i++) { for (j=0; j<gridsize/procgridsize; j++) { local[i][j] += 1; // increase by one the value } } /* it all goes back to process 0 */ MPI_Gatherv(&(local[0][0]), gridsize*gridsize/(procgridsize*procgridsize), MPI_INT, globalptr, sendcounts, displs, subarrtype, 0, MPI_COMM_WORLD); /* don't need the local data anymore */ free2D(&local); /* or the MPI data type */ MPI_Type_free(&subarrtype); if (rank == 0) { printf("Processed grid:\n"); for (i=0; i<gridsize; i++) { for (j=0; j<gridsize; j++) { printf("%2d ", global[i][j]); } printf("\n"); } free2D(&global); } MPI_Finalize(); return 0; } 

输出:

 linux16:>mpicc -o main main.c linux16:>mpiexec -n 4 main Global array is: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Local process on rank 0 is: | 1 2 | | 5 6 | Local process on rank 1 is: | 3 4 | | 7 8 | Local process on rank 2 is: | 9 10 | |13 14 | Local process on rank 3 is: |11 12 | |15 16 | Processed grid: 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17