1

Я хотел бы интегрировать систему дифференциальных уравнений с использованием множества различных комбинаций параметров и сохранить конечные значения переменных которые принадлежат определенному набору параметров. Поэтому я реализовал простой цикл for, в котором созданы случайные начальные условия и комбинации параметров, система интегрирована и интересующие значения хранятся в соответствующих массивах. Поскольку я намерен сделать это для многих комбинаций параметров для довольно сложной системы (здесь я использую только игрушечную систему для иллюстрации), которая также может стать жесткой, я бы хотел распараллелить симуляции, чтобы ускорить процесс, используя многопроцессорность Python ".Многопроцессорность Python: ускорение цикла for для нескольких наборов параметров, «применить» и «apply_async»

Однако, когда я запускаю симуляции, цикл for всегда быстрее, чем его параллельная версия. Единственный способ быть быстрее, чем for-loop, который я нашел до сих пор, - использовать «apply_async» вместо «apply». Для получения 10 различных комбинаций параметров, я, например, следующий вывод (используя код снизу):

The for loop took 0.11986207962 seconds! 
[ 41.75971761 48.06034375 38.74134139 25.6022232 46.48436046 
    46.34952734 50.9073202 48.26035086 50.05026187 41.79483135] 
Using apply took 0.180637836456 seconds! 
41.7597176061 
48.0603437545 
38.7413413879 
25.6022231983 
46.4843604574 
46.3495273394 
50.9073202011 
48.2603508573 
50.0502618731 
41.7948313502 
Using apply_async took 0.000414133071899 seconds! 
41.7597176061 
48.0603437545 
38.7413413879 
25.6022231983 
46.4843604574 
46.3495273394 
50.9073202011 
48.2603508573 
50.0502618731 
41.7948313502 

Хотя в этом примере порядок результатов одинаковы для «применить» и «apply_async», это кажется не быть правдой в целом. Поэтому я хотел бы использовать «apply_async», поскольку он намного быстрее, но в этом случае я не знаю, как я могу сопоставить результат моделирования с параметрами/начальными условиями, которые я использовал для соответствующего моделирования.

Мои вопросы таким образом:

1) Почему «применить» много slowlier, чем просто для цикла в этом случае?

2) Когда я использую «apply_async» вместо «apply», параллелизованная версия становится намного быстрее, чем for-loop, но как я могу затем сопоставить результат моделирования с параметрами, которые я использовал в соответствующем имитации ?

3) В этом случае результаты «apply» и «apply_async» имеют одинаковый порядок. Почему это? Совпадение?

Мой код можно найти ниже:

from pylab import * 
import multiprocessing as mp 
from scipy.integrate import odeint 
import time 

#my system of differential equations 
def myODE (yn,tvec,allpara): 

    (x, y, z) = yn 

    a, b = allpara['para'] 

    dx = -x + a*y + x*x*y 
    dy = b - a*y - x*x*y 
    dz = x*y 

    return (dx, dy, dz) 

#for reproducibility  
seed(0) 

#time settings for integration 
dt = 0.01 
tmax = 50 
tval = arange(0,tmax,dt) 

numVar = 3 #number of variables (x, y, z) 
numPar = 2 #number of parameters (a, b) 
numComb = 10 #number of parameter combinations 

INIT = zeros((numComb,numVar)) #initial conditions will be stored here 
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here 
RES = zeros(numComb) #z(tmax) will be stored here 

tic = time.time() 

for combi in range(numComb): 

    INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0 

    PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly 

    allpara = {'para': PARA[combi,:]} 

    results = transpose(odeint(myODE, INIT[combi,:], tval, args=(allpara,))) #integrate system 

    RES[combi] = results[numVar - 1][-1] #store z 

    #INIT[combi,:] = results[:,-1] #update initial conditions 
    #INIT[combi,-1] = 0 #set z to 0 

toc = time.time() 

print 'The for loop took ', toc-tic, 'seconds!' 

print RES 

#function for the multi-processing part 
def runMyODE(yn,tvec,allpara): 

    return transpose(odeint(myODE, yn, tvec, args=(allpara,))) 

tic = time.time() 

pool = mp.Pool(processes=4) 
results = [pool.apply(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)] 

toc = time.time() 

print 'Using apply took ', toc-tic, 'seconds!' 

for sol in range(numComb): 
    print results[sol][2,-1] #print final value of z 

tic = time.time()  
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]  
toc = time.time() 
print 'Using apply_async took ', toc-tic, 'seconds!' 

for sol in range(numComb): 
    print resultsAsync[sol].get()[2,-1] #print final value of z 
+0

Как было отмечено в текущий ответ, ваше асинхронное применение является фиктивным, поскольку вы не позволяете завершить работу, прежде чем распечатать время. Весь смысл асинхронности заключается в том, чтобы избежать блокировки вызывающего потока во время выполнения работы. Тем не менее, первое правило получения реальной выгоды от обработки параллельных циклов заключается в обеспечении того, чтобы каждый поток/задача выполнял достаточную работу. Для выполнения этих задач требуется больше потоковых потоков планирования, чем однопоточный цикл, поэтому вам приходится компенсировать эти накладные расходы, делая значительно больше на каждой итерации. –

+0

Например, если вы хотите распараллелить что-то вроде вычисления нормалей вершин, вы повредите производительность, если каждая итерация цикла вычисляет только одну вершину. Вы хотите, чтобы каждая итерация вычисляла тысячи нормалей, чтобы компенсировать накладные расходы на потоки. Напишите код таким образом, чтобы каждый поток имел очень мясистую работу, и вы начнете видеть ускорения, которые начинают становиться все более пропорциональными вашим аппаратным возможностям. –

+0

Спасибо за ваши комментарии, Айк! Моя, по-видимому, наивная идея заключалась в том, что если у меня есть определенное количество комбинаций параметров, и для цикла для меня требуется количество T для их интеграции, я получаю время ~ T/4, если я использую параллельный подход с 4 ядрами. Вы правы в том, что в этом конкретном примере вычислительные задачи не так «мясистые». Однако существуют системы, для которых случайные параметры приводят к жесткой системе, что приводит к длительному времени интегрирования. Ожидая, что такая жесткая система будет интегрирована, я хотел бы одновременно интегрировать несколько нежестких систем. – Cleb

ответ

1

Обратите внимание, что тот факт, что ваш apply_async является 289 раз быстрее, то цикл немного подозрительно! И сейчас вы гарантированно получите результаты в том порядке, в котором они отправлены, даже если это не то, что вы хотите для максимального параллелизма.

apply_async запускает задачу, она не ждет, пока она не будет завершена; .get() делает это. Таким образом:

tic = time.time()  
resultsAsync = [pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]})) for combi in range(numComb)]  
toc = time.time() 

На самом деле это не очень справедливое измерение; вы запустили все задания, но они еще не завершены.

С другой стороны, как только вы выполните .get() результаты, вы знаете, что задача завершена и у вас есть ответ; таким образом это

for sol in range(numComb): 
    print resultsAsync[sol].get()[2,-1] #print final value of z 

Означает, что наверняка у вас есть результаты в порядке (потому что вы собираетесь через объекты ApplyResult в порядке и .get() ИНГ их); но вы можете захотеть получить результаты, как только они будут готовы, а не делать блокировку ожидания по шагам по одному. Но это означает, что вам нужно будет пометить результаты своими параметрами так или иначе.

Вы можете использовать обратные вызовы для сохранения результатов после того, как задачи выполнены, и возвращаете параметры вместе с результатами, чтобы полностью асинхронными возвращается:

def runMyODE(yn,tvec,allpara): 
    return allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,))) 

asyncResults = [] 

def saveResult(result): 
    asyncResults.append((result[0], result[1][2,-1])) 

tic = time.time() 
for combi in range(numComb): 
    pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:]}), callback=saveResult) 
pool.close() 
pool.join() 
toc = time.time() 

print 'Using apply_async took ', toc-tic, 'seconds!' 

for res in asyncResults: 
    print res[0], res[1] 

дает вам более разумный срок; результаты все еще почти всегда в порядке, потому что задачи занимают очень одинаковое количество времени:

Using apply took 0.0847041606903 seconds! 
[ 6.02763376 5.44883183] 41.7597176061 
[ 4.37587211 8.91773001] 48.0603437545 
[ 7.91725038 5.2889492 ] 38.7413413879 
[ 0.71036058 0.871293 ] 25.6022231983 
[ 7.78156751 8.70012148] 46.4843604574 
[ 4.61479362 7.80529176] 46.3495273394 
[ 1.43353287 9.44668917] 50.9073202011 
[ 2.64555612 7.74233689] 48.2603508573 
[ 0.187898 6.17635497] 50.0502618731 
[ 9.43748079 6.81820299] 41.7948313502 
Using apply_async took 0.0259671211243 seconds! 
[ 4.37587211 8.91773001] 48.0603437545 
[ 0.71036058 0.871293 ] 25.6022231983 
[ 6.02763376 5.44883183] 41.7597176061 
[ 7.91725038 5.2889492 ] 38.7413413879 
[ 7.78156751 8.70012148] 46.4843604574 
[ 4.61479362 7.80529176] 46.3495273394 
[ 1.43353287 9.44668917] 50.9073202011 
[ 2.64555612 7.74233689] 48.2603508573 
[ 0.187898 6.17635497] 50.0502618731 
[ 9.43748079 6.81820299] 41.7948313502 

Обратите внимание, что вместо цикла по применить, вы можете также использовать карту:

pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, para=PARA[combi,:]), range(numComb), callback=saveResult) 
+0

Большое спасибо за этот подробный ответ! Я проверю это. – Cleb

+0

Я не получаю эту строку «map_async» для запуска. Все, что я делаю, это заменить строки «для combi in range (numComb): pool.apply_async (runMyODE, args = (INIT [combi,:], tval, {'para': PARA [combi ,:]}), callback = saveResult) "по строке с" pool.map_async ", но затем ничего не печатается. Какие-либо предложения?Не могли бы вы изменить свой код для этого примера ?! Благодаря! Кроме того, pool.close() и pool.join() требуется или «только» хороший стиль? – Cleb

+0

@Cleb: Похоже, в некоторых довольно недавних версиях Python есть ошибка в map_async(), где игнорируется параметр обратного вызова (это приведет к тому, что список результатов будет пустым, что вы видите). Новейшие версии (или достаточно старые!) Должны работать. Что касается close() и join(), это то, что вы хотели бы сделать, чтобы привести в порядок, а это значит, что карта будет закончена, но вы также можете легко вызвать wait() для всех результатов или (если map_async работает), получите одно возвращаемое значение из map_async и return. Также стоит попробовать просто map(). –

Смежные вопросы