2016-02-12 2 views
2

Я хотел бы собрать содержимое массива numpy от всех процессоров до одного. Если все массивы имеют одинаковый размер, это работает. Однако я не вижу естественного способа выполнения одной и той же задачи для массивов прозависимого размера. Пожалуйста, обратите внимание на следующий код:Нечетные массивы numpy send/receive

from mpi4py import MPI 
import numpy 

comm = MPI.COMM_WORLD 
rank = comm.rank 
size = comm.size 

if rank >= size/2: 
    nb_elts = 5 
else: 
    nb_elts = 2 

# create data 
lst = [] 
for i in xrange(nb_elts): 
    lst.append(rank*3+i) 
array_lst = numpy.array(lst, dtype=int) 

# communicate array 
result = [] 
if rank == 0: 
    result = array_lst 
    for p in xrange(1, size): 
     received = numpy.empty(nb_elts, dtype=numpy.int) 
     comm.Recv(received, p, tag=13) 
     result = numpy.concatenate([result, received]) 
else: 
    comm.Send(array_lst, 0, tag=13) 

Моя проблема заключается в «полученном» распределении. Как я могу узнать, какой размер будет выделен? Должен ли я сначала отправлять/получать размер каждого массива?

Основываясь на предложение ниже, я пойду с

data_array = numpy.ones(rank + 3, dtype=int) 
data_array *= rank + 5 
print '[{}] data: {} ({})'.format(rank, data_array, type(data_array)) 

# make all processors aware of data array sizes 
all_sizes = {rank: data_array.size} 
gathered_all_sizes = comm_py.allgather(all_sizes) 
for d in gathered_all_sizes: 
    all_sizes.update(d) 

# prepare Gatherv as described by @francis 
nbsum = 0 
sendcounts = [] 
displacements = [] 
for p in xrange(size): 
    n = all_sizes[p] 
    displacements.append(nbsum) 
    sendcounts.append(n) 
    nbsum += n 

if rank==0: 
    result = numpy.empty(nbsum, dtype=numpy.int) 
else: 
    result = None 

comm_py.Gatherv(data_array,[result, tuple(sendcounts), tuple(displacements), MPI.INT64_T], root=0) 

print '[{}] gathered data: {}'.format(rank, result) 

ответ

2

В коде вы вставили, как Send() и Recv() посылает nb_elts элементы. Проблема заключается в том, что nb_elts не то же самое для каждого процесса ... Таким образом, количество товара, полученных не совпадает с числом элементов, которые были отправлены и программа жалуется:

mpi4py.MPI.Exception: MPI_ERR_TRUNCATE : сообщение усечено

Чтобы предотвратить это, корневой процесс должен вычислить количество элементов, отправленных другими процессами. Следовательно, в петле for p in xrange(1, size), nb_elts должен быть вычислен в соответствии с p, а не rank.

Исправлен следующий код на основе вашего. Я бы добавил, что естественным способом выполнения этой операции сбора является использование Gatherv(). См., Например, http://materials.jeremybejarano.com/MPIwithPython/collectiveCom.html и . Я добавил соответствующий пример кода. Единственным сложным моментом является то, что numpy.int имеет длину 64 бит. Следовательно, Gatherv() использует тип MPI MPI_DOUBLE.

from mpi4py import MPI 
import numpy 

comm = MPI.COMM_WORLD 
rank = comm.rank 
size = comm.size 

if rank >= size/2: 
    nb_elts = 5 
else: 
    nb_elts = 2 

# create data 
lst = [] 
for i in xrange(nb_elts): 
    lst.append(rank*3+i) 
array_lst = numpy.array(lst, dtype=int) 

# communicate array 
result = [] 
if rank == 0: 
    result = array_lst 
    for p in xrange(1, size): 

     if p >= size/2: 
      nb_elts = 5 
     else: 
      nb_elts = 2 

     received = numpy.empty(nb_elts, dtype=numpy.int) 
     comm.Recv(received, p, tag=13) 
     result = numpy.concatenate([result, received]) 
else: 
    comm.Send(array_lst, 0, tag=13) 

if rank==0: 
    print "Send Recv, result= "+str(result) 

#How to use Gatherv: 
nbsum=0 
sendcounts=[] 
displacements=[] 

for p in xrange(0,size): 
    displacements.append(nbsum) 
    if p >= size/2: 
      nbsum+= 5 
      sendcounts.append(5) 
    else: 
      nbsum+= 2 
      sendcounts.append(2) 

if rank==0: 
    print "nbsum "+str(nbsum) 
    print "sendcounts "+str(tuple(sendcounts)) 
    print "displacements "+str(tuple(displacements)) 
print "rank "+str(rank)+" array_lst "+str(array_lst) 
print "numpy.int "+str(numpy.dtype(numpy.int))+" "+str(numpy.dtype(numpy.int).itemsize)+" "+str(numpy.dtype(numpy.int).name) 

if rank==0: 
    result2=numpy.empty(nbsum, dtype=numpy.int) 
else: 
    result2=None 

comm.Gatherv(array_lst,[result2,tuple(sendcounts),tuple(displacements),MPI.DOUBLE],root=0) 

if rank==0: 
    print "Gatherv, result2= "+str(result2) 
+0

Спасибо, Фрэнсис. Я не мог использовать ваш код напрямую, так как в моем случае каждый процессор знает только свой размер массива. Я добавил обновленную версию кода на ваше предложение Gatherv на мой вопрос. –

+0

Добро пожаловать! Обратите внимание, что аргументы 'recvcounts',' disps' и 'recvtype' имеют значение только у root. См. Https://www.open-mpi.org/doc/v1.8/man3/MPI_Gatherv.3.php. Следовательно, вполне вероятно, что 'gather()' количества отправленных элементов является достаточным и 'all_gather()' overkill. Попробуйте передать пустой список '[]' другим процессам в 'gatherv()': он работает. – francis

Смежные вопросы