2015-05-17 1 views
2

Я хотел бы интегрировать систему дифференциальных уравнений для нескольких комбинаций параметров с использованием многопроцессорного модуля Python. Таким образом, система должна быть интегрирована, и комбинация параметров должна быть сохранена, а также ее индекс и конечное значение одной из переменных.Многопроцессорность в Python: как реализовать цикл над «apply_async» как «map_async» с помощью функции обратного вызова

Хотя это прекрасно работает, когда я использую apply_async - который уже быстрее, чем делать это в простой для цикла - я не в состоянии реализовать то же самое, используя map_async, который, кажется, быстрее, чем apply_async. Функция обратного вызова никогда не вызывается, и я не знаю, почему. Может ли кто-нибудь объяснить, почему это происходит и как получить тот же результат, используя map_async вместо apply_async?!

Вот мой код:

from pylab import * 
import multiprocessing as mp 
from scipy.integrate import odeint 
import time 

#my system of differential equations 
def myODE (yn,tvec,allpara): 

    (x, y, z) = yn 

    a, b = allpara['para'] 

    dx = -x + a*y + x*x*y 
    dy = b - a*y - x*x*y 
    dz = x*y 

    return (dx, dy, dz) 

#returns the index of the parameter combination, the parameters and the integrated solution 
#this way I know which parameter combination belongs to which outcome in the asynch-case 
def runMyODE(yn,tvec,allpara): 
    return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,))) 

#for reproducibility  
seed(0) 

#time settings for integration 
dt = 0.01 
tmax = 50 
tval = arange(0,tmax,dt) 

numVar = 3 #number of variables (x, y, z) 
numPar = 2 #number of parameters (a, b) 
numComb = 5 #number of parameter combinations 

INIT = zeros((numComb,numVar)) #initial conditions will be stored here 
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here 

#create some initial conditions and random parameters 
for combi in range(numComb): 

    INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0 

    PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly 

#################################using loop over apply#################### 

#results will be stored in here 
asyncResultsApply = [] 

#my callback function 
def saveResultApply(result): 
    # storing the index, a, b and the final value of z 
    asyncResultsApply.append((result[0], result[1], result[2][2,-1])) 

#start the multiprocessing part 
pool = mp.Pool(processes=4) 
for combi in range(numComb): 
    pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply) 
pool.close() 
pool.join() 

for res in asyncResultsApply: 
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z 

#######################################using map##################### 
#the only difference is that the for loop is replaced by a "map_async" call 
print "\n\nnow using map\n\n" 
asyncResultsMap = [] 

#my callback function which is never called 
def saveResultMap(result): 
    # storing the index, a, b and the final value of z 
    asyncResultsMap.append((result[0], result[1], result[2][2,-1])) 

pool = mp.Pool(processes=4) 
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap) 
pool.close() 
pool.join() 

#this does not work yet 
for res in asyncResultsMap: 
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z 

ответ

1

Если я вас правильно понял, это связано с чем-то, что сбивает с толку людей довольно часто. Обратный вызов apply_async вызывается после одного операнда, но так же map - он не вызывает обратный вызов для каждого элемента, а скорее один раз на весь результат.

Вы правильнее отметить, что map быстрее, чем apply_async s. Если вы хотите что-то должно произойти после каждого результата, есть несколько способов пойти:

  1. Вы можете эффективно добавить функцию обратного вызова для операции, которую вы хотите быть выполнены на каждом элементе, и map с помощью этого.

  2. Вы можете использовать imap (или imap_unordered) в цикле и выполнить обратный вызов внутри тела цикла. Конечно, это означает, что все будет выполняться в родительском процессе, но характер вещей, написанных как обратные вызовы, означает, что это обычно не проблема (это, как правило, дешевые функции). YMMV.


Например, предположим, что у вас есть функции f и cb, и вы хотели бы mapf на escb с каждого оп. Тогда вы можете либо сделать:

def look_ma_no_cb(e): 
    r = f(e) 
    cb(r) 
    return r 

p = multiprocessing.Pool() 
p.map(look_ma_no_cb, es) 

или

for r in p.imap(f, es): 
    cb(r) 
+0

Ok, спасибо за ваш ответ, но как бы я использовать «map_async» в этом случае, чтобы получить те же результаты, как и для apply_async? Не могли бы вы рассказать об этом, пожалуйста? «Вы можете эффективно добавить обратный вызов к операции, которую вы хотите выполнить для каждого элемента, и сопоставить ее с этим». Я думал, что это произойдет в строке «pool.map_async (lambda combi: runMyODE (INIT [combi ,:], tval, para = PARA [combi ,:]), range (numComb), callback = saveResultMap)" Не могли бы вы добавить/пересмотреть код, чтобы «map_async» работал в приведенном выше примере, пожалуйста ?! – Cleb

+0

ОК, я добавлю пояснения. –

+0

Было бы здорово, спасибо! Если вы создадите этот пример, используя «map_async», я затем приму ваш ответ. – Cleb

Смежные вопросы