用MPI_Gather在Fortran中发送二维数组

我想使用MPI_GATHER发送2d数据块。例如,我在每个节点上都有2×3数组,如果我有4个节点,我想在根上使用8×3数组。 对于一维数组MPI_GATHER根据MPIsorting数据sorting,但对于二维数据它创build混乱! 整理大块的干净方法是什么?

我期待这个代码的输出:

program testmpi use mpi implicit none integer :: send (2,3) integer :: rec (4,3) integer :: ierror,my_rank,i,j call MPI_Init(ierror) MPI_DATA_TYPE type_col ! find out process rank call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierror) if (my_rank==0) then send=1 do i=1,2 print*,(send(i,j),j=1,3) enddo endif if (my_rank==1) then send=5 ! do 1,2 ! print*,(send(i,j),j=1,3) ! enddo endif call MPI_GATHER(send,6,MPI_INTEGER,rec,6,MPI_INTEGER,0,MPI_COMM_WORLD,ierror) if (my_rank==0) then print*,'<><><><><>rec' do i=1,4 print*,(rec(i,j),j=1,3) enddo endif call MPI_Finalize(ierror) end program testmpi 

是这样的:

  1 1 1 1 1 1 5 5 5 5 5 5 

但是看起来像这样:

  1 1 5 1 1 5 1 5 5 1 5 5 

下面是这个答案的文字Fortran翻译。 我以为这是不必要的,但数组索引和内存布局的多重差异可能意味着它是值得做一个Fortran版本。

首先让我说你一般不想这么做 – 从一些“主”过程中分散和收集大量的数据。 通常情况下,您希望每个任务都能摆脱自己的难题,并且您应该致力于永远不要让一个处理器需要全局数据的“全局视图” 只要你需要,你限制了可扩展性和问题的大小。 如果你正在为I / O做这件事 – 一个进程读取数据,然后分散它,然后把它收集起来写入,你最终会想看看MPI-IO。

但是,解决你的问题,MPI有非常好的方式来将任意数据从内存中取出,并将其分散/收集到一组处理器中。 不幸的是,这需要相当数量的MPI概念 – MPItypes,范围和集体操作。 MPI_Type_create_subarray和MPI_Gather在这个问题的答案中讨论了很多基本思想。

考虑一个1d的整数全局数组,任务0拥有你想要分配给多个MPI任务的权限,这样他们每个人都可以在本地数组中获得一块。 假设你有4个任务,全局数组是[0,1,2,3,4,5,6,7]。 你可以让任务0发送四条消息(包括一条本身)来分发这个消息,当它重新组装时,接收四条消息将它们捆绑在一起; 但是在大量的stream程中显然会非常耗时。 有这些操作的优化例程 – 分散/收集操作。 所以在这个案例中,你可以这样做:

 integer, dimension(8) :: global ! only root has this integer, dimension(2) :: local ! everyone has this integer, parameter :: root = 0 integer :: rank, comsize integer :: i, ierr call MPI_Init(ierr) call MPI_Comm_size(MPI_COMM_WORLD, comsize, ierr) call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierr) if (rank == root) then global = [ (i, i=1,8) ] endif call MPI_Scatter(global, 2, MPI_INTEGER, & ! send everyone 2 ints from global local, 2, MPI_INTEGER, & ! each proc recieves 2 into root, & ! sending process is root, MPI_COMM_WORLD, ierr) ! all procs in COMM_WORLD participate 

之后,处理器的数据看起来就像

 task 0: local:[1,2] global: [1,2,3,4,5,6,7,8] task 1: local:[3,4] global: [garbage] task 2: local:[5,6] global: [garbage] task 3: local:[7,8] global: [garbage] 

也就是说,分散操作采用全局数组并向所有处理器发送连续的2-int块。

为了重新组装数组,我们使用MPI_Gather()操作,该操作完全相同,但是相反:

 local = local + rank call MPI_Gather (local, 2, MPI_INTEGER, & ! everyone sends 2 ints from local global, 2, MPI_INTEGER, & ! root receives 2 ints each proc into global root, & ! receiving process is root, MPI_COMM_WORLD, ierr) ! all procs in COMM_WORLD participate 

现在arrays看起来像:

 task 0: local:[1,2] global: [1,2,4,5,7,8,10,11] task 1: local:[4,5] global: [garbage-] task 2: local:[7,8] global: [garbage-] task 3: local:[10,11] global: [garbage-] 

收集带回所有的数据。

如果数据点的数量没有平均分配进程数量,我们需要向每个进程发送不同数量的项目,会发生什么情况? 然后你需要一个通用的分散版本MPI_Scatterv ,它可以让你指定每个处理器的计数和位移 – 全局数组中的数据开始的地方。 所以我们假设在4个任务中有9个字符的字符数组[a,b,c,d,e,f,g,h,i],而且每个进程要分配除最后一个字符外的两个字符,那三个。 那么你需要

 character, dimension(9) :: global character, dimension(3) :: local integer, dimension(4) :: counts integer, dimension(4) :: displs if (rank == root) then global = [ (achar(i+ichar('a')), i=0,8) ] endif local = ['-','-','-'] counts = [2,2,2,3] displs = [0,2,4,6] mycounts = counts(rank+1) call MPI_Scatterv(global, counts, displs, & ! proc i gets counts(i) chars from displs(i) MPI_CHARACTER, & local, mycounts, MPI_CHARACTER, & ! I get mycounts chars into root, & ! root rank does sending MPI_COMM_WORLD, ierr) ! all procs in COMM_WORLD participate 

现在数据看起来像

 task 0: local:"ab-" global: "abcdefghi" task 1: local:"cd-" global: *garbage* task 2: local:"ef-" global: *garbage* task 3: local:"ghi" global: *garbage* 

您现在已经使用scatterv来分配不规则的数据量。 每种情况下的位移是从数组开始的两个*等级(以字符测量的;位移以散列发送的types或聚集接收的types的单位为单位,通常不以字节为单位),而计数是[2,2,2,3]。 如果它是第一个处理器,我们希望有3个字符,我们将设置计数= [3,2,2,2]和位移将是[0,3,5,7]。 Gatherv再次作品完全相同,但相反; 计数和显示数组将保持不变。

现在,对于2D来说,这有点棘手。 如果我们想发送2d数组的2d个子块,我们现在发送的数据不再是连续的。 如果我们发送(比如说)一个6x6arrays的3×3子块到4个处理器,我们发送的数据就有了漏洞:

 2D Array --------- |000|222| |000|222| |000|222| |---+---| |111|333| |111|333| |111|333| --------- Actual layout in memory [000111000111000111222333222333222333] 

(请注意,所有高性能计算归结为了解内存中数据的布局。)

如果我们想要将标记为“1”的数据发送给任务1,则需要跳过三个值,发送三个值,跳过三个值,发送三个值,跳过三个值,发送三个值。 第二个复杂情况是分区域停止和开始的地方; 注意区域“1”在区域“0”停止的地方不开始; 在区域“0”的最后一个元素之后,存储器中的下一个位置是通过区域“1”的中途。

我们首先解决第一个布局问题 – 如何提取我们想要发送的数据。 我们总是可以将所有的“0”区域数据复制到另一个连续的数组中,然后发送; 如果我们仔细地计划好了,我们甚至可以这样做,我们可以调用MPI_Scatter的结果。 但是我们宁愿不必以这种方式转换整个主数据结构。

到目前为止,我们使用的所有MPI数据types都是简单的 – MPI_INTEGER指定(比如说)连续4个字节。 但是,MPI允许您创build自己的数据types,用于描述内存中任意复杂的数据布局。 而这种情况下 – matrix的矩形子区域 – 已经足够普遍, 有一个特定的要求 。 对于上面描述的二维情况,

 integer :: newtype; integer, dimension(2) :: sizes, subsizes, starts sizes = [6,6] ! size of global array subsizes = [3,3] ! size of sub-region starts = [0,0] ! let's say we're looking at region "0" ! which begins at offset [0,0] call MPI_Type_create_subarray(2, sizes, subsizes, starts, MPI_ORDER_FORTRAN, MPI_INTEGER, newtype, ierr) call MPI_Type_commit(newtype, ierr) 

这将创build一个从全局数组中只挑出区域“0”的types。 请注意,即使在Fortran中,起始参数也是从数组开始的偏移量 (例如从0开始),而不是索引(例如,基于1的)。

我们现在可以把这一块数据发送给另一个处理器

 call MPI_Send(global, 1, newtype, dest, tag, MPI_COMM_WORLD, ierr) ! send region "0" 

并且接收进程可以将其接收到本地数组中。 请注意,接收过程,如果它只接收到一个3×3数组,不能描述它接收的是一种新types; 不再描述内存布局,因为在一行的结尾和下一行的开始之间没有大的跳跃。 相反,它只是接收一个3 * 3 = 9整数的块:

 call MPI_Recv(local, 3*3, MPI_INTEGER, 0, tag, MPI_COMM_WORLD, ierr) 

请注意,我们也可以为其他子区域执行此操作,可以通过为其他块创build不同的types(使用不同的起始数组),或者仅从特定块的第一个位置开始发送:

 if (rank == root) then call MPI_Send(global(4,1), 1, newtype, 1, tag, MPI_COMM_WORLD, ierr) call MPI_Send(global(1,4), 1, newtype, 2, tag, MPI_COMM_WORLD, ierr) call MPI_Send(global(4,4), 1, newtype, 3, tag, MPI_COMM_WORLD, ierr) local = global(1:3, 1:3) else call MPI_Recv(local, 3*3, MPI_INTEGER, 0, tag, MPI_COMM_WORLD, rstatus, ierr) endif 

现在我们已经理解了如何指定子区域,在使用分散/聚集操作之前还有一件事要讨论,那就是这些types的“大小”。 我们不能只用这些types的MPI_Scatter()(或者甚至是scatterv),因为这些types有15个整数的范围; 也就是说,在它们开始之后它们结束的位置是15个整数,并且它们结束的位置与下一个块的开始位置不匹配,所以我们不能只使用分散 – 它会select错误的地方开始发送数据到下一个处理器。

当然,我们可以使用MPI_Scatterv()来指定自己的位移,这就是我们要做的 – 除了位移是以发送types为单位的,这对我们也没有帮助。 块从(0,3,18,21)整数的偏移量开始,从全局数组开始,并且块从其开始位置结束15个整数的事实不能让我们expression那些整数倍的位移。

为了解决这个问题,MPI可以让你为这些计算的目的设置types的范围。 它不会截断types; 它只是用来确定下一个元素开始给出最后一个元素的位置。 对于像这些孔中的types,将范围设置为比内存中的距离小到实际types的结尾通常比较方便。

我们可以将程度设定为对我们来说很方便的任何事情。 我们可以将范围1整数,然后以整数为单位设置位移。 在这种情况下,我喜欢将范围设置为3个整数 – 一个子列的大小 – 这样,在块“0”之后立即开始块“1”,并且在块“ 2" 。 不幸的是,当从块“2”跳到块“3”时,它不是很好地工作,但这不能被帮助。

因此,在这种情况下分散子块,我们会做以下几点:

 integer(kind=MPI_ADDRESS_KIND) :: extent starts = [0,0] sizes = [6, 6] subsizes = [3, 3] call MPI_Type_create_subarray(2, sizes, subsizes, starts, & MPI_ORDER_FORTRAN, MPI_INTEGER, & newtype, ierr) call MPI_Type_size(MPI_INTEGER, intsize, ierr) extent = 3*intsize call MPI_Type_create_resized(newtype, 0, extent, resizedtype, ierr) call MPI_Type_commit(resizedtype, ierr) 

这里我们已经创build了和前面一样的块types,但是我们调整了它的大小。 我们没有改变types“开始”(0)的位置,但是我们改变了“结束”(3个整数)的位置。 我们之前没有提到这个,但是MPI_Type_commit需要能够使用这个types; 但是您只需要提交实际使用的最终types,而不是任何中间步骤。 完成后,您可以使用MPI_Type_free来释放提交的types。

所以现在最后,我们可以分散块:上面的数据操作有点复杂,但一旦完成,scatterv看起来就像以前一样:

 counts = 1 ! we will send one of these new types to everyone displs = [0,1,6,7] ! the starting point of everyone's data ! in the global array, in block extents call MPI_Scatterv(global, counts, displs, & ! proc i gets counts(i) types from displs(i) resizedtype, & local, 3*3, MPI_INTEGER, & ! I'm receiving 3*3 int root, MPI_COMM_WORLD, ierr) !... from (root, MPI_COMM_WORLD) 

现在我们完成了,分散,聚集和MPI派生types的一些小游览之后。

下面是一个示例代码,其中显示了收集和分散操作以及字符数组。 运行程序:

 $ mpirun -np 4 ./scatter2d global array is: 000222 000222 000222 111333 111333 111333 Rank 0 received: 000 000 000 Rank 1 received: 111 111 111 Rank 2 received: 222 222 222 Rank 3 received: 333 333 333 Rank 0 sending: 111 111 111 Rank 1 sending: 222 222 222 Rank 2 sending: 333 333 333 Rank 3 sending: 444 444 444 Root received: 111333 111333 111333 222444 222444 222444 

代码如下:

 program scatter use mpi implicit none integer, parameter :: gridsize = 6 ! size of array integer, parameter :: procgridsize = 2 ! size of process grid character, allocatable, dimension (:,:) :: global, local integer, dimension(procgridsize**2) :: counts, displs integer, parameter :: root = 0 integer :: rank, comsize integer :: localsize integer :: i, j, row, col, ierr, p, charsize integer, dimension(2) :: sizes, subsizes, starts integer :: newtype, resizedtype integer, parameter :: tag = 1 integer, dimension(MPI_STATUS_SIZE) :: rstatus integer(kind=MPI_ADDRESS_KIND) :: extent, begin call MPI_Init(ierr) call MPI_Comm_size(MPI_COMM_WORLD, comsize, ierr) call MPI_Comm_rank(MPI_COMM_WORLD, rank, ierr) if (comsize /= procgridsize**2) then if (rank == root) then print *, 'Only works with np = ', procgridsize**2, ' for now.' endif call MPI_Finalize(ierr) stop endif localsize = gridsize/procgridsize allocate( local(localsize, localsize) ) if (rank == root) then allocate( global(gridsize, gridsize) ) forall( col=1:procgridsize, row=1:procgridsize ) global((row-1)*localsize+1:row*localsize, & (col-1)*localsize+1:col*localsize) = & achar(ichar('0')+(row-1)+(col-1)*procgridsize) end forall print *, 'global array is: ' do i=1,gridsize print *, global(i,:) enddo endif starts = [0,0] sizes = [gridsize, gridsize] subsizes = [localsize, localsize] call MPI_Type_create_subarray(2, sizes, subsizes, starts, & MPI_ORDER_FORTRAN, MPI_CHARACTER, & newtype, ierr) call MPI_Type_size(MPI_CHARACTER, charsize, ierr) extent = localsize*charsize begin = 0 call MPI_Type_create_resized(newtype, begin, extent, resizedtype, ierr) call MPI_Type_commit(resizedtype, ierr) counts = 1 ! we will send one of these new types to everyone forall( col=1:procgridsize, row=1:procgridsize ) displs(1+(row-1)+procgridsize*(col-1)) = (row-1) + localsize*procgridsize*(col-1) endforall call MPI_Scatterv(global, counts, displs, & ! proc i gets counts(i) types from displs(i) resizedtype, & local, localsize**2, MPI_CHARACTER, & ! I'm receiving localsize**2 chars root, MPI_COMM_WORLD, ierr) !... from (root, MPI_COMM_WORLD) do p=1, comsize if (rank == p-1) then print *, 'Rank ', rank, ' received: ' do i=1, localsize print *, local(i,:) enddo endif call MPI_Barrier(MPI_COMM_WORLD, ierr) enddo local = achar( ichar(local) + 1 ) do p=1, comsize if (rank == p-1) then print *, 'Rank ', rank, ' sending: ' do i=1, localsize print *, local(i,:) enddo endif call MPI_Barrier(MPI_COMM_WORLD, ierr) enddo call MPI_Gatherv( local, localsize**2, MPI_CHARACTER, & ! I'm sending localsize**2 chars global, counts, displs, resizedtype,& root, MPI_COMM_WORLD, ierr) if (rank == root) then print *, ' Root received: ' do i=1,gridsize print *, global(i,:) enddo endif call MPI_Type_free(newtype,ierr) if (rank == root) deallocate(global) deallocate(local) call MPI_Finalize(ierr) end program scatter 

所以这是一般的解决scheme。 对于你的特定情况,我们只是按行添加,我们不需要一个Gatherv,我们可以使用一个聚集,因为在这种情况下,所有的位移是相同的 – 在2d块的情况下,我们有一个stream离失所,然后随着你越过下一个街区而跳跃。 在这里,位移总是比前一个位移一个,所以我们不需要明确地给出位移。 所以最终的代码如下所示:

 program testmpi use mpi implicit none integer, dimension(:,:), allocatable :: send, recv integer, parameter :: nsendrows = 2, nsendcols = 3 integer, parameter :: root = 0 integer :: ierror, my_rank, comsize, i, j, ierr integer :: blocktype, resizedtype integer, dimension(2) :: starts, sizes, subsizes integer (kind=MPI_Address_kind) :: start, extent integer :: intsize call MPI_Init(ierror) call MPI_Comm_rank(MPI_COMM_WORLD, my_rank, ierror) call MPI_Comm_size(MPI_COMM_WORLD, comsize, ierror) allocate( send(nsendrows, nsendcols) ) send = my_rank if (my_rank==root) then ! we're going to append the local arrays ! as groups of send rows allocate( recv(nsendrows*comsize, nsendcols) ) endif ! describe what these subblocks look like inside the full concatenated array sizes = [ nsendrows*comsize, nsendcols ] subsizes = [ nsendrows, nsendcols ] starts = [ 0, 0 ] call MPI_Type_create_subarray( 2, sizes, subsizes, starts, & MPI_ORDER_FORTRAN, MPI_INTEGER, & blocktype, ierr) start = 0 call MPI_Type_size(MPI_INTEGER, intsize, ierr) extent = intsize * nsendrows call MPI_Type_create_resized(blocktype, start, extent, resizedtype, ierr) call MPI_Type_commit(resizedtype, ierr) call MPI_Gather( send, nsendrows*nsendcols, MPI_INTEGER, & ! everyone send 3*2 ints recv, 1, resizedtype, & ! root gets 1 resized type from everyone root, MPI_COMM_WORLD, ierr) if (my_rank==0) then print*,'<><><><><>recv' do i=1,nsendrows*comsize print*,(recv(i,j),j=1,nsendcols) enddo endif call MPI_Finalize(ierror) end program testmpi 

用3个过程运行这个过程给出:

 $ mpirun -np 3 ./testmpi <><><><><>recv 0 0 0 0 0 0 1 1 1 1 1 1 2 2 2 2 2 2