MPI-3 非阻塞集合通信

在上一篇中我们概要地介绍了最新的 MPI-3 标准中引进的新特性,mpi4py 3.0.0 支持 MPI-3 的很多新特性,我们将在后面逐步介绍,下面我们首先介绍 mpi4py 中的非阻塞集合通信。

在前面我们介绍了 mpi4py 中的集合通信,不过前面介绍的是 MPI-1 和 MPI-2 标准下的集合通信方法,所有这些方法都是阻塞式的,在通信没有完成之前,这些方法不会返回,因此无法进行后面的计算任务。MPI-3 标准引进了所有这些集合通信方法的非阻塞版本,mpi4py 中的非阻塞集合通信方法与其对应的阻塞版本有着完全一样的方法接口,不同的是这些非阻塞方法会返回一个 MPI.Request 对象,然后可以通过该对象的 Test 或者 Wait 等方法来测试或等待通信的完成,这部分与非阻塞的点到点通信的测试和等待是一样的。非阻塞集合通信也可以防止死锁,并通过将通信和计算重叠而提高程序的运行效率。

方法接口

下面给出非阻塞集合通信的方法接口。

MPI.Comm.Ibcast(self, buf, int root=0)

非阻塞广播操作。对应阻塞版本的 MPI.Comm.Bcast,返回 MPI.Request 对象。

MPI.Comm.Iscatter(self, sendbuf, recvbuf, int root=0)

非阻塞发散操作。对应阻塞版本的 MPI.Comm.Scatter,返回 MPI.Request 对象。

MPI.Comm.Iscatterv(self, sendbuf, recvbuf, int root=0)

非阻塞向量发散操作。对应阻塞版本的 MPI.Comm.Scatterv,返回 MPI.Request 对象。

MPI.Comm.Igather(self, sendbuf, recvbuf, int root=0)

非阻塞收集操作。对应阻塞版本的 MPI.Comm.Gather,返回 MPI.Request 对象。

MPI.Comm.Igatherv(self, sendbuf, recvbuf, int root=0)

非阻塞向量收集操作。对应阻塞版本的 MPI.Comm.Gatherv,返回 MPI.Request 对象。

MPI.Comm.Ireduce(self, sendbuf, recvbuf, Op op=SUM, int root=0)

非阻塞规约操作。对应阻塞版本的 MPI.Comm.Reduce,返回 MPI.Request 对象。

MPI.Comm.Iallgather(self, sendbuf, recvbuf)

非阻塞全收集操作。对应阻塞版本的 MPI.Comm.Allgather,返回 MPI.Request 对象。

MPI.Comm.Iallgatherv(self, sendbuf, recvbuf)

非阻塞向量全收集操作。对应阻塞版本的 MPI.Comm.Allgatherv,返回 MPI.Request 对象。

MPI.Comm.Iallreduce(self, sendbuf, recvbuf, Op op=SUM)

非阻塞全规约操作。对应阻塞版本的 MPI.Comm.Allreduce,返回 MPI.Request 对象。

MPI.Comm.Ireduce_scatter_block(self, sendbuf, recvbuf, Op op=SUM)

非阻塞非向量规约发散操作。对应阻塞版本的 MPI.Comm.Reduce_scatter_block,返回 MPI.Request 对象。

MPI.Comm.Ireduce_scatter(self, sendbuf, recvbuf, recvcounts=None, Op op=SUM)

非阻塞向量规约发散操作。对应阻塞版本的 MPI.Comm.Reduce_scatter,返回 MPI.Request 对象。

MPI.Comm.Ialltoall(self, sendbuf, recvbuf)

非阻塞全发散操作。对应阻塞版本的 MPI.Comm.Alltoall,返回 MPI.Request 对象。

MPI.Comm.Ialltoallv(self, sendbuf, recvbuf)

非阻塞向量全发散操作。对应阻塞版本的 MPI.Comm.Alltoallv,返回 MPI.Request 对象。

MPI.Comm.Ialltoallw(self, sendbuf, recvbuf)

非阻塞向量全发散操作。对应阻塞版本的 MPI.Comm.Alltoallw,返回 MPI.Request 对象。

MPI.Intracomm.Iscan(self, sendbuf, recvbuf, Op op=SUM)

非阻塞扫描操作。对应阻塞版本的 MPI.Comm.Scan,返回 MPI.Request 对象。注意:此方法只在组内通信子上有定义。

MPI.Intracomm.Iexscan(self, sendbuf, recvbuf, Op op=SUM)

非阻塞前缀扫描操作。对应阻塞版本的 MPI.Comm.Exscan,返回 MPI.Request 对象。注意:此方法只在组内通信子上有定义。

MPI.Comm.Ibarrier(self)

非阻塞栅障同步操作。对应阻塞版本的 MPI.Comm.Barrier,返回 MPI.Request 对象。

例程

下面给出部分非阻塞集合操作的使用例程。

# nbc.py

"""
Demonstrates nonblocking collective communication.

Run this with 4 processes like:
$ mpiexec -n 4 python nbc.py
"""

import numpy as np
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# ------------------------------------------------------------------------
# broadcast a numpy array by using Ibcast
if rank == 0:
    ary = np.arange(10, dtype='i')
else:
    ary = np.empty(10, dtype='i')

req = comm.Ibcast(ary, root=0)
req.Wait()
print 'Ibcast: rank %d has %s' % (rank, ary)


# ------------------------------------------------------------------------
# scatter a numpy array by using Iscatterv
if rank == 0:
    send_buf = np.arange(10, dtype='i')
    recv_buf = np.empty(4, dtype='i')
elif rank == 1:
    send_buf = None
    recv_buf = np.empty(3, dtype='i')
elif rank == 2:
    send_buf = None
    recv_buf = np.empty(2, dtype='i')
else:
    send_buf = None
    recv_buf = np.empty(1, dtype='i')
count = [4, 3, 2, 1]
displ = [0, 4, 7, 9]

req = comm.Iscatterv([send_buf, count, MPI.INT], recv_buf, root=0)
req.Wait()
print 'Iscatterv: rank %d has %s' % (rank, recv_buf)


# ------------------------------------------------------------------------
# Ialltoall
send_buf = np.arange(8, dtype='i')
recv_buf = np.empty(8, dtype='i')
req = comm.Ialltoall(send_buf, recv_buf)
req.Wait()
print 'Ialltoall: rank %d has %s' % (rank, recv_buf)

运行结果如下:

$ mpiexec -n 4 python nbc.py
Ibcast: rank 0 has [0 1 2 3 4 5 6 7 8 9]
Iscatterv: rank 0 has [0 1 2 3]
Ialltoall: rank 0 has [0 1 0 1 0 1 0 1]
Ibcast: rank 1 has [0 1 2 3 4 5 6 7 8 9]
Iscatterv: rank 1 has [4 5 6]
Ialltoall: rank 1 has [2 3 2 3 2 3 2 3]
Ibcast: rank 2 has [0 1 2 3 4 5 6 7 8 9]
Iscatterv: rank 2 has [7 8]
Ialltoall: rank 2 has [4 5 4 5 4 5 4 5]
Ibcast: rank 3 has [0 1 2 3 4 5 6 7 8 9]
Iscatterv: rank 3 has [9]
Ialltoall: rank 3 has [6 7 6 7 6 7 6 7]

非阻塞栅障同步——Ibarrier

非阻塞集合通信方法除 Ibarrier 外都比较容易理解,因为它们同其对应的阻塞版本有着同样的语义。但是非阻塞的栅障同步操作 Ibarrier 却让人感到有点匪夷所思。因为 Ibarrier 是非阻塞的,并不会让运行快的进程停在此等待其它进程,表面上看似乎起不到让进程同步的作用。实际上非阻塞的栅障同步 Ibarrier 是一个非常有用的操作,合适地使用它可以大大提高程序的计算效率。Ibarrier 虽然不会让进程阻塞下来等待,但是到达 Ibarrier 的进程会宣告自己已经到达了需要同步的执行点,然后可以去做其它不依赖于此同步的工作,而不必像在阻塞同步中那样在此白白等待,其可以周期性地通过 Test 等方法来检测是否所有的进程都已经到达需要同步的点,只要还有进程没有到达该点,Test 的结果就会为 False,一旦所有进程都到达了同步点,Test 的结果就会为 True,表示完成了所需的同步工作,所有进程可以进行后续依赖于此同步的计算任务。

打个简单的比方,几个人商量好 9 点在某会议室开会,只有等所有人都到齐后才会举行会议,有的人可能会早于 9 点到达,有的人可能会迟到,在阻塞同步的情况下,先到的人都会在会议室什么事都不做白白等着最后一个到的人,而在非阻塞同步情况下,先到的人可以在会议室签个到或留张小纸条以表明自己已经到了,然后可以到附近溜达溜达,比如说喝杯咖啡或做点其它与会议无关的事情,并每过一段时间回来看看签到表或小纸条是否所有人都已到齐,只要还有人没到,就可以接着做点自己的事情,一旦所有人都已到齐,会议就可以举行了。

下面给出非阻塞栅障同步操作的简单示例。

# Ibarrier.py

"""
Demonstrates the usage of Ibarrier()

Run this with 4 processes like:
$ mpiexec -n 4 python Ibarrier.py
"""

import time
import random
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# synchronize here by blocking barrier
comm.Barrier()

# each process sleep for a random of time
time.sleep(random.random() / 100000)

# nonblocking barrier
req = comm.Ibarrier()

cnt = 0
while(not req.Test()):
    # do some work until all processes reach the Ibarrier
    print 'rank %d: %d' % (rank, cnt)
    cnt += 1

# do other things depend on this Ibarrier
# ...

运行结果如下:

$ mpiexec -n 4 python Ibarrier.py
rank 3: 0
rank 0: 0
rank 0: 1
rank 0: 2
rank 0: 3
rank 1: 0
rank 2: 0
rank 2: 1
rank 2: 2

在以上例程中,由于每个进程 sleep 的时间不同,因此到达 Ibarrier 的时间也不同,但是因为 Ibarrier 是非阻塞的,因此先到达的进程并不会阻塞在此等待其它进程,而是会立即返回一个 MPI.Request 对象 req 并执行后续的计算工作(此处进入 while 循环输出循环次数),但是只要还有进程没有到达同步位置点 Ibarrier,req.Test() 就会返回 False,只有当所有进程都执行完 sleep 到达 Ibarrier 位置,req.Test() 的结果才会为 True,所有进程才会退出 while 循环。在非阻塞同步的过程中,运行快的进程(此处即 sleep 时间少的进程)不会等待运行慢的进程,而是会利用此时间做更多其它的计算任务(此处输出更多的循环计数)。可见非阻塞的栅障同步操作可以避免运行快的进程的不必要的等待时间以提高程序的计算效率。

以上我们介绍了 mpi4py 中的非阻塞集合通信方法,在下一篇中我们将介绍 mpi4py 中的近邻集合通信方法。

你可能感兴趣的