Я использую многопроцессорную обработку с пулами. Мне нужно передать структуру как аргумент функции, которая должна использоваться в отдельных процессах. Я столкнулся с проблемой с функциями отображения multiprocessing.Pool
, так как я не могу дублировать ни Pool.Queue
, ни Pool.Array
. Эта структура должна использоваться «на лету» для регистрации результатов каждого завершенного процесса. Вот мой код:Многопроцессорный пул и очереди
import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time
def do_work(number, out_queue=None):
if out_queue is not None:
print "Treated nb ", number
out_queue.append("Treated nb " + str(number))
return 0
def multi_run_wrapper(iter_values):
return do_work(*iter_values)
def test_pool():
# Get the max cpu
nb_proc = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=nb_proc)
total_tasks = 16
tasks = range(total_tasks)
out_queue= Queue() # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
out_array = Array('i', total_tasks)
iter_values = itertools.izip(tasks, itertools.repeat(out_array))
results = pool.map_async(multi_run_wrapper, iter_values)
pool.close()
pool.join()
print results._value
while not out_queue.empty():
print "queue: ", out_queue.get()
print "out array: \n", out_array
if __name__ == "__main__":
test_pool()
Мне нужно запустить рабочий в отдельном процессе и передать свою очередь вывода в качестве аргумента. Я также хочу указать пул, содержащий ограниченное число запущенных процессов. Для этого я использую функцию pool.map_async()
. К сожалению, часть кода выше, дает мне ошибку:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
Я считаю, это потому, что Queue
не могут быть скопированы, когда-либо, как я прочитал в док. Тогда я подумал о том, чтобы сделать очередь глобальной переменной, чтобы мне не нужно было ее передавать, но это было бы так беспорядочно на мой взгляд. Я также думал используя multiprocessing.Array
вместо
out_array = Array('i', total_tasks)
но та же ошибка будет подступиться, как с очередями:
# ...
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
мне нужно использовать эту особенность - использование многопроцессорных и обмен информацию от подпроцессов - в относительно большом программном обеспечении, поэтому я хочу, чтобы мой код оставался чистым и аккуратным.
Как я могу передать очередь своему работнику элегантным способом?
Конечно, любой другой способ справиться с основной спецификацией приветствуется.
Вы дали мне разные ценные советы, спасибо. Будьте уверены, условие гонки используется как фиктивный тест;) – kaligne
он принял бы его, если он передан как init. см. [здесь] (http://stackoverflow.com/questions/3827065/can-i-use-a-multiprocessing-queue-in-a-function-called-by-pool-imap?rq=1) –