2012-06-15 4 views
9

При запуске большого количества задач (с большими параметрами) с использованием Pool.apply_async процессы распределяются и переходят в состояние ожидания, и нет ограничений для количества ожидающих процессов. Это может в конечном итоге, употребляя всю память, как в примере ниже:Многопроцессорность Python: как ограничить количество ожидающих процессов?

import multiprocessing 
import numpy as np 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(): 

    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 

Я ищу способ ограничить очереди ожидания, таким образом, что существует лишь ограниченное число процессов, ожидающих, и Pool.apply_async заблокирован, пока очередь ожидания заполнена.

+0

Приятный пример (+1). – mgilson

ответ

6

multiprocessing.Pool имеет член _taskqueue типа multiprocessing.Queue, который принимает необязательный параметр maxsize; к сожалению, он создает его без набора параметров maxsize.

Я бы рекомендовал подклассы multiprocessing.Pool с копией пастой multiprocessing.Pool.__init__maxsize, который проходит в _taskqueue конструкторе.

Обезьяны заплат объект (либо бассейн, либо очереди) также будет работать, но вы должны были бы monkeypatch pool._taskqueue._maxsize и pool._taskqueue._sem поэтому было бы довольно хрупким:

pool._taskqueue._maxsize = maxsize 
pool._taskqueue._sem = BoundedSemaphore(maxsize) 
+1

Я использую Python 2.7.3, а _taskqueue имеет тип Queue.Queue. Это означает, что это простая очередь, а не многопроцессорная. Subclassing multiprocessing.Pool и overriding __init__ работает нормально, но обезглавливание объекта не работает должным образом. Однако, это хак, который я искал, спасибо. –

0

Вы можете добавить явную Queue с параметром maxsize и использовать queue.put() вместо pool.apply_async() в этом случае. Тогда рабочие процессы могли:

for a, b in iter(queue.get, sentinel): 
    # process it 

Если вы хотите ограничить количество создаваемых входных аргументов/результаты, которые находятся в памяти примерно числа активных рабочих процессов, то вы могли бы использовать pool.imap*() методы:

#!/usr/bin/env python 
import multiprocessing 
import numpy as np 

def f(a_b): 
    return np.linalg.solve(*a_b) 

def main(): 
    args = ((np.random.rand(1000,1000), np.random.rand(1000)) 
      for _ in range(1000)) 
    p = multiprocessing.Pool() 
    for result in p.imap_unordered(f, args, chunksize=1): 
     pass 
    p.close() 
    p.join() 

if __name__ == '__main__': 
    main() 
+0

Использование 'imap' не имеет значения. Входная очередь по-прежнему не ограничена, и использование этого решения закончится тем, что будет потреблять всю память. – Radim

+0

@Radim: код «imap» в ответе работает, даже если вы даете ему бесконечный генератор. – jfs

+0

Не в Python 2, к сожалению (не смотрели на код в py3). Для некоторых работ см. [Этот ответ SO] (http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration). – Radim

1

Подождите, пожалуйста, если pool._taskqueue над требуемым размером:

import multiprocessing 
import numpy as np 
import time 

def f(a,b): 
    return np.linalg.solve(a,b) 

def test(max_apply_size=100): 
    p = multiprocessing.Pool() 
    for _ in range(1000): 
     p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000))) 

     while pool._taskqueue.qsize() > max_apply_size: 
      time.sleep(1) 

    p.close() 
    p.join() 

if __name__ == '__main__': 
    test() 
+0

Просто хочу добавить, что я нашел это самым легким решением проблем с памятью при многопроцессорной обработке. Я использовал max_apply_size = 10, и это отлично работает для моей проблемы, что является медленным преобразованием файлов. Использование семафора в качестве предложения @ecatmur кажется более надежным решением, но может быть излишним для простых скриптов. – Nate

Смежные вопросы