2016-01-13 4 views
0

Я использую многопроцессорную обработку с пулами. Мне нужно передать структуру как аргумент функции, которая должна использоваться в отдельных процессах. Я столкнулся с проблемой с функциями отображения multiprocessing.Pool, так как я не могу дублировать ни Pool.Queue, ни Pool.Array. Эта структура должна использоваться «на лету» для регистрации результатов каждого завершенного процесса. Вот мой код:Многопроцессорный пул и очереди

import multiprocessing 
from multiprocessing import Process, Manager, Queue, Array 
import itertools 
import time 

def do_work(number, out_queue=None): 
    if out_queue is not None: 
     print "Treated nb ", number 
     out_queue.append("Treated nb " + str(number)) 
    return 0 


def multi_run_wrapper(iter_values): 
    return do_work(*iter_values) 

def test_pool(): 
    # Get the max cpu 
    nb_proc = multiprocessing.cpu_count() 

    pool = multiprocessing.Pool(processes=nb_proc) 
    total_tasks = 16 
    tasks = range(total_tasks) 

    out_queue= Queue() # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function. 
    out_array = Array('i', total_tasks) 
    iter_values = itertools.izip(tasks, itertools.repeat(out_array)) 
    results = pool.map_async(multi_run_wrapper, iter_values) 

    pool.close() 
    pool.join() 
    print results._value 
    while not out_queue.empty(): 
     print "queue: ", out_queue.get() 
    print "out array: \n", out_array 

if __name__ == "__main__": 
    test_pool() 

Мне нужно запустить рабочий в отдельном процессе и передать свою очередь вывода в качестве аргумента. Я также хочу указать пул, содержащий ограниченное число запущенных процессов. Для этого я использую функцию pool.map_async(). К сожалению, часть кода выше, дает мне ошибку:

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner 
    self.run() 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks 
    put(task) 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__ 
    assert_spawning(self) 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning 
    ' through inheritance' % type(self).__name__ 
RuntimeError: Queue objects should only be shared between processes through inheritance 

Я считаю, это потому, что Queue не могут быть скопированы, когда-либо, как я прочитал в док. Тогда я подумал о том, чтобы сделать очередь глобальной переменной, чтобы мне не нужно было ее передавать, но это было бы так беспорядочно на мой взгляд. Я также думал используя multiprocessing.Array вместо

out_array = Array('i', total_tasks) 

но та же ошибка будет подступиться, как с очередями:

# ... 
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance 

мне нужно использовать эту особенность - использование многопроцессорных и обмен информацию от подпроцессов - в относительно большом программном обеспечении, поэтому я хочу, чтобы мой код оставался чистым и аккуратным.

Как я могу передать очередь своему работнику элегантным способом?

Конечно, любой другой способ справиться с основной спецификацией приветствуется.

ответ

3

multiprocessing.Pool не принимает multiprocessing.Queue в качестве аргумента в своей рабочей очереди. Я считаю, что это связано с тем, что он внутренне использует очереди для отправки данных назад и вперед для рабочих процессов. Есть несколько обходных путей:

1) Вам действительно нужно использовать очередь? Одним из преимуществ функции Pool является то, что их возвращаемые значения возвращаются к основным процессам. Обычно лучше перебирать возвращаемые значения из пула, чем использовать отдельную очередь. Это также позволяет избежать появления состояния гонки, проверив queue.empty()

2) Если вы должны использовать Queue, вы можете использовать его с multiprocessing.Manager. Это прокси для общей очереди, которая может быть передана в качестве аргумента функции Pool.

3) Вы можете передать нормальный Queue к рабочим процессам с использованием initializer при создании Pool (как https://stackoverflow.com/a/3843313). Это своего рода хакки.

Состояние гонки я уже упоминал выше, происходит от:

while not out_queue.empty(): 
    print "queue: ", out_queue.get() 

Если у вас есть рабочие процессы заполнения вашей очереди, вы можете иметь состояние, когда ваша очередь пуста, потому что работник собирается поставить что-то внутрь. Если вы проверите .empty() в это время, вы скоро закончите.Лучше всего поставить sentinal значения в вашей очереди, чтобы сигнализировать о завершении ввода данных в него.

+0

Вы дали мне разные ценные советы, спасибо. Будьте уверены, условие гонки используется как фиктивный тест;) – kaligne

+0

он принял бы его, если он передан как init. см. [здесь] (http://stackoverflow.com/questions/3827065/can-i-use-a-multiprocessing-queue-in-a-function-called-by-pool-imap?rq=1) –

Смежные вопросы