2016-07-11 4 views
4

Я ищу, чтобы заполнить большой массив numpy, используя многопроцессорность. Я работал над параллельными примерами фьючерсов в документации, но не получил достаточного понимания для изменения использования.заполнить массив numpy через concurrent.futures multiprocessing

Вот упрощенная версия того, что я хотел бы сделать:

import numpy 
import concurrent.futures 

squares = numpy.empty((20, 2)) 

def make_square(i, squares): 
    print('iteration', i) 
    squares[i, 0], squares[i, 1] = i, i ** 2 

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for i in range(20): 
     executor.submit(make_square, i, squares) 

Выход работает что-то вроде:

iteration 1 
iteration 0 
iteration 2 
iteration 3 
iteration 5 
iteration 4 
iteration 6 
iteration 7 
iteration 8 
iteration 9 
iteration 10 
iteration 11 
iteration 12 
iteration 13 
iteration 15 
iteration 14 
iteration 16 
iteration 17 
iteration 18 
iteration 19 

, который хорошо демонстрирует, что функция работает одновременно. Но массив квадратов по-прежнему пуст.

Каков правильный синтаксис заполнения массива квадратов?

Во-вторых, будет ли использование .map лучше реализовать?

Заранее благодарен!

8/2/17 Ничего себе. Поэтому я бродил по красно-коричневому краю, потому что я не получал никаких решений для этой проблемы. Так счастлив вернуться сюда в stackoverflow. Спасибо @ilia w495 никитин и @donkopotamus. Вот что я написал в reddit, который более подробно объясняет эту проблему.

The posted code is an analogy of what I'm trying to do, which is populating 
a numpy array with a relatively simple calculation (dot product) involving 
two other arrays. The algorithm depends on a value N which can be anything 
from 1 on up, though we won't likely use a value larger than 24. 

I'm currently running the algorithm on a distributed computing system and 
the N = 20 versions take longer than 10 days to complete. I'm using dozens 
of cores to obtain the required memory, but gaining none of the benefits of 
multiple CPUs. I've rewritten the code using numba which makes lower N 
variants superfast on my own laptop which can't handle the memory 
requirements for larger Ns, but alas, our distributed computing environment 
is not currently able to install numba. So I'm attempting concurrent.futures 
to take advantage of the multiple CPUs in our computing environment in the 
hopes of speeding things up. 

Так что вычисление не является интенсивным, это 16 миллионов + итераций. Инициализированным массивом является N x 2 ** N, т.е. диапазон (16777216) в приведенном выше коде.

Возможно, просто невозможно заполнить массив посредством многопроцессорной обработки.

+0

массива Ваших квадратиков пуст, потому что вы пытаетесь изменить его в отдельном ** процессы ** – donkopotamus

+0

@zazizoma Не заполнять, а инициализировать. Есть еще парадигма. общие структуры данных должны быть неизменными. Думаю, вам следует разделить ваш массив на части 'C', где' C' - количество процессоров и обрабатывать каждую часть на отдельном CPU (процессе). Затем присоединяйтесь ко всем частям, и вы получите то, что хотите. Но в некоторых случаях это неприменимо. Это зависит от вашего алгоритма. Кроме того, передача данных между процессами имеет свою собственную стоимость. Например, я буду стараться реализовать с помощью 'pymp': https://gist.github.com/w495/6d3cd6a715e3098a3a10a0479d9fbb03 С' concurrent.futures' это будет проще. –

+0

Отлично. Я также посмотрю, как запустить dot-продукт в нескольких процессорах и оставить итерации линейными. Это может помочь. ДЕЙСТВИТЕЛЬНО оцените руководство. – zazizoma

ответ

1

Проблема здесь в том, что ProcessPoolExecutor будет выполнять функцию в отдельного процесса.

Как это отдельные процессы, с отдельным пространством памяти, вы не можете ожидать, что любые изменения, которые они делают в массив (squares) будут отражены в родителе. Следовательно, ваш исходный массив не изменяется (как вы открываете).

Вам необходимо выполнить одно из следующих действий:

  • использовать ThreadPoolExecutor, но знать, в общем случае, вы все равно не должны пытаться изменить глобальные переменные в несколько потоков;
  • обновите свой код, чтобы ваш процесс/нить выполняли какие-то (дорогостоящие) вычисления и return result.

Последний будет выглядеть следующим образом:

squares = numpy.zeros((20, 2)) 

def make_square(i): 
    print('iteration', i) 

    # compute expensive data here ... 

    # return row number and the computed data 
    return i, ([i, i**2]) 

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for row, result in executor.map(make_square, range(20)): 
     squares[row] = result 

Это даст результат, который вы ожидаете:

[[ 0. 0.] 
[ 1. 1.] 
[ 2. 4.] 
... 
[ 18. 324.] 
[ 19. 361.]] 
+0

Но в чем причина повторного использования этой (квадратов) переменной? Это не гарантирует более низкое использование памяти. Более того, это нехорошо, чтобы повторно отправлять ненужные данные через процессы. В этом случае вы можете получить 'row' через' enumerate'. –

-1

А good example, я предполагаю, что это поможет вам:

from concurrent.futures import ProcessPoolExecutor 
from time import sleep 

def return_after_5_secs(message): 
    sleep(5) 
    return message 

pool = ProcessPoolExecutor(3) 

future = pool.submit(return_after_5_secs, ("hello")) 
print(future.done()) 
sleep(2) 
print(future.done()) 
sleep(2) 
print(future.done()) 
sleep(2) 
print(future.done()) 
print("Result: " + future.result()) 

будущее - только обещают что-то сделать. Поэтому я вижу ваш код следующим образом:

import concurrent.futures 
import itertools 
import os 
import time 

import numpy 

SQUARE_LIST_SIZE = 20 


def main(): 
    # Creates empty array. 
    square_list = numpy.empty((SQUARE_LIST_SIZE, 2)) 

    # Creates a sequence (generator) of promises 
    future_seq = make_future_seq(square_list) 

    # Creates a sequence (generator) of computed square. 
    square_seq = make_square_seq(future_seq) 

    # Creates a sequence (generator) of computed square. 
    square_list = list(square_seq) 

    return square_list 


def make_future_seq(squares): 
    """ 
     Generates the sequence of empty a promises. 
     Creates a new process only on `submit`. 
    """ 

    with concurrent.futures.ProcessPoolExecutor(4) as executor: 
     for i in range(SQUARE_LIST_SIZE): 
      # Only makes a promise to do something. 
      future = executor.submit(make_one_square, i, squares) 
      print('future ', i, '= >', future) 
      yield future 


def make_square_seq(future_seq): 
    """ 
     Generates the sequence of fulfilled a promises. 
    """ 

    # Just to copy iterator 
    for_show_1, for_show_2, future_seq = itertools.tee(future_seq, 3) 

    # Let's check it, May be it withdrawn =) 
    for i, future in enumerate(for_show_1): 
     print('future ', i, 'done [1] =>', future.done()) 

    # Try to keep its promises 
    for future in future_seq: 
     yield future.result() 

    # Let's check it one more time. It is faithful to! 
    for i, future in enumerate(for_show_2): 
     print('future ', i, 'done [2] =>', future.done()) 

    return future_seq 


def make_one_square(i, squares): 
    print('inside [1] = >', i, 'pid = ', os.getpid()) 
    squares[i, 0], squares[i, 1] = i, i ** 2 

    time.sleep(1) # Long and hard computation. 

    print('inside [2]= >', i, 'pid = ', os.getpid()) 
    return squares 


if __name__ == '__main__': 
    main() 

Слишком буква может быть. Это просто для объяснения. Это зависит, но для множества реальных примеров требуется только звонок future.result(). Проверьте эту страницу: concurrent.futures.html

Так что этот код будет порождающая нечто подобное:

$ python test_futures_1.py 
future 0 = > <Future at 0x7fc0dc758278 state=running> 
future 0 done [1] => False 
future 1 = > <Future at 0x7fc0dc758da0 state=pending> 
inside [1] = > 0 pid = 19364 
future 1 done [1] => False 
inside [1] = > 1 pid = 19365 
future 2 = > <Future at 0x7fc0dc758e10 state=pending> 
future 2 done [1] => False 
future 3 = > <Future at 0x7fc0dc758cc0 state=pending> 
inside [1] = > 2 pid = 19366 
future 3 done [1] => False 
future 4 = > <Future at 0x7fc0dc769048 state=pending> 
future 4 done [1] => False 
inside [1] = > 3 pid = 19367 
future 5 = > <Future at 0x7fc0dc758f60 state=running> 
future 5 done [1] => False 
future 6 = > <Future at 0x7fc0dc758fd0 state=pending> 
future 6 done [1] => False 
future 7 = > <Future at 0x7fc0dc7691d0 state=pending> 
future 7 done [1] => False 
future 8 = > <Future at 0x7fc0dc769198 state=pending> 
future 8 done [1] => False 
future 9 = > <Future at 0x7fc0dc7690f0 state=pending> 
future 9 done [1] => False 
future 10 = > <Future at 0x7fc0dc769438 state=pending> 
future 10 done [1] => False 
future 11 = > <Future at 0x7fc0dc7694a8 state=pending> 
future 11 done [1] => False 
future 12 = > <Future at 0x7fc0dc769550 state=pending> 
future 12 done [1] => False 
future 13 = > <Future at 0x7fc0dc7695f8 state=pending> 
future 13 done [1] => False 
future 14 = > <Future at 0x7fc0dc7696a0 state=pending> 
future 14 done [1] => False 
future 15 = > <Future at 0x7fc0dc769748 state=pending> 
future 15 done [1] => False 
future 16 = > <Future at 0x7fc0dc7697f0 state=pending> 
future 16 done [1] => False 
future 17 = > <Future at 0x7fc0dc769898 state=pending> 
future 17 done [1] => False 
future 18 = > <Future at 0x7fc0dc769940 state=pending> 
future 18 done [1] => False 
future 19 = > <Future at 0x7fc0dc7699e8 state=pending> 
future 19 done [1] => False 
inside [2]= > 0 pid = 19364 
inside [2]= > 1 pid = 19365 
inside [1] = > 4 pid = 19364 
inside [2]= > 2 pid = 19366 
inside [1] = > 5 pid = 19365 
inside [1] = > 6 pid = 19366 
inside [2]= > 3 pid = 19367 
inside [1] = > 7 pid = 19367 
inside [2]= > 4 pid = 19364 
inside [2]= > 5 pid = 19365 
inside [2]= > 6 pid = 19366 
inside [1] = > 8 pid = 19364 
inside [1] = > 9 pid = 19365 
inside [1] = > 10 pid = 19366 
inside [2]= > 7 pid = 19367 
inside [1] = > 11 pid = 19367 
inside [2]= > 8 pid = 19364 
inside [2]= > 9 pid = 19365 
inside [2]= > 10 pid = 19366 
inside [2]= > 11 pid = 19367 
inside [1] = > 13 pid = 19366 
inside [1] = > 12 pid = 19364 
inside [1] = > 14 pid = 19365 
inside [1] = > 15 pid = 19367 
inside [2]= > 14 pid = 19365 
inside [2]= > 13 pid = 19366 
inside [2]= > 12 pid = 19364 
inside [2]= > 15 pid = 19367 
inside [1] = > 16 pid = 19365 
inside [1] = > 17 pid = 19364 
inside [1] = > 18 pid = 19367 
inside [1] = > 19 pid = 19366 
inside [2]= > 16 pid = 19365 
inside [2]= > 18 pid = 19367 
inside [2]= > 17 pid = 19364 
inside [2]= > 19 pid = 19366 
future 0 done [2] => True 
future 1 done [2] => True 
future 2 done [2] => True 
future 3 done [2] => True 
future 4 done [2] => True 
future 5 done [2] => True 
future 6 done [2] => True 
future 7 done [2] => True 
future 8 done [2] => True 
future 9 done [2] => True 
future 10 done [2] => True 
future 11 done [2] => True 
future 12 done [2] => True 
future 13 done [2] => True 
future 14 done [2] => True 
future 15 done [2] => True 
future 16 done [2] => True 
future 17 done [2] => True 
future 18 done [2] => True 
future 19 done [2] => True 
+0

Да, но, как я понимаю, первоначальная проблема заключается не в инициализации 'квадратов'. Я полагаю, что настоящая проблема заключается в понимании того, как работать с «PoolExecutor». –

+0

Спасибо! Pls видит изменения в оригинальной записи. – zazizoma

+0

@donkopotamus, пожалуйста, up-down-vote this answer =) –

Смежные вопросы