2016-05-26 3 views
1

Для этого DASK кода:запуская последовательность параллельных задач

def inc(x): 
    return x + 1 

for x in range(5): 
    array[x] = delay(inc)(x) 

Я хочу, чтобы получить доступ ко всем элементам в array пути выполнения просроченных задач. Но я не могу позвонить array.compute(), так как array не является функцией. Если я

for x in range(5): 
    array[x].compute() 

затем выполняет каждая задача запускается на выполнение параллельно или же a[1] уволят только после того, как a[0], конечен? Есть ли лучший способ написать этот код?

+0

удалит мой комментарий я неправильно понял вопрос. –

ответ

0

Легко определить, выполняются ли параллели, если вы вынуждаете их занять много времени. Если вы запустите этот код:

from time import sleep, time 
from dask import delayed 

start = time() 

def inc(x): 
    sleep(1) 
    print('[inc(%s): %s]' % (x, time() - start)) 
    return x + 1 

array = [0] * 5 
for x in range(5): 
    array[x] = delayed(inc)(x) 

for x in range(5): 
    array[x].compute() 

Совершенно очевидно, что вызовы происходят последовательно. Однако, если вы замените последний цикл следующим образом:

delayed(array).compute() 

, то вы можете видеть, что они находятся параллельно. На моей машине вывод выглядит следующим образом:

[inc(1): 1.00373506546] 
[inc(4): 1.00429320335] 
[inc(2): 1.00471806526] 
[inc(3): 1.00475406647] 
[inc(0): 2.00795912743] 

Очевидно, что первые четыре выполняемые задачи были параллельны. Предположительно, параллелизм по умолчанию устанавливается на количество ядер на машине, потому что для задач с интенсивным использованием ЦП обычно не так полезно иметь больше.

+0

@JRR обновил мой ответ. Я предлагаю вам использовать ответ MRocklin, потому что он больше похож на более законный метод, но выполняет тот же тест. –

1

Вы можете использовать функцию dask.compute вычислить множество запаздывающих значений сразу

from dask import delayed, compute 

array = [delayed(inc)(i) for i in range(5)] 
result = compute(*array) 
Смежные вопросы