2014-08-28 3 views
25

Я пытаюсь использовать частичную функцию, чтобы pool.map() мог нацеливать на функцию с более чем одним параметром (в данном случае объект Lock()).Python разделяет блокировку между процессами

Вот пример кода (взятый из ответа на предыдущий мой вопрос):

from functools import partial 

def target(lock, iterable_item): 
    for item in items: 
     # Do cool stuff 
     if (... some condition here ...): 
      lock.acquire() 
      # Write to stdout or logfile, etc. 
      lock.release() 

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    pool = multiprocessing.Pool() 
    l = multiprocessing.Lock() 
    func = partial(target, l) 
    pool.map(func, iterable) 
    pool.close() 
    pool.join() 

Однако, когда я запускаю этот код, я получаю ошибку:

Runtime Error: Lock objects should only be shared between processes through inheritance. 

Что я отсутствует здесь? Как я могу совместно использовать блокировку между моими подпроцессами?

+0

Существует еще один вопрос по этой же проблеме, хотя их конкретная ошибка отличается от других. [Неисправность использования блокировки с многопроцессорной обработкой.Pool: ошибка травления] (http://stackoverflow.com/questions/17960296/trouble-using-a- lock-with-multiprocessing-pool-pickling-error) –

ответ

46

Извините, я должен был поймать это в ответе на ваш другой вопрос. Вы не можете передать нормальные multiprocessing.Lock объектов методам Pool, потому что их нельзя мариновать. Есть два способа обойти это. Одним из них является создание Manager() и передать Manager.Lock():

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    pool = multiprocessing.Pool() 
    m = multiprocessing.Manager() 
    l = m.Lock() 
    func = partial(target, l) 
    pool.map(func, iterable) 
    pool.close() 
    pool.join() 

Это немного тяжеловесный, хотя; использование Manager требует нереста другого процесса для размещения сервера Manager. И все звонки в acquire/release шлюз должен быть отправлен на этот сервер через IPC.

Другой вариант - передать обычное multiprocessing.Lock() при создании пула, используя initializer kwarg. Это сделает ваш экземпляр блокировки глобальным во всех работающих детях:

def target(iterable_item): 
    for item in items: 
     # Do cool stuff 
     if (... some condition here ...): 
      lock.acquire() 
      # Write to stdout or logfile, etc. 
      lock.release() 
def init(l): 
    global lock 
    lock = l 

def main(): 
    iterable = [1, 2, 3, 4, 5] 
    l = multiprocessing.Lock() 
    pool = multiprocessing.Pool(initializer=init, initargs=(l,)) 
    pool.map(target, iterable) 
    pool.close() 
    pool.join() 

Второго решение имеет побочный эффект больше не требует partial.

+0

Еще раз спасибо, сэр. Это похоже на то, что мне нужно. Очень ценю продолжение помощи! Другие варианты выглядели супер. Я поеду с функцией инициализатора для совместного использования глобальной блокировки. – DJMcCarthy12

+0

Это сработало отлично. Я также поместил «Queue» в init, чтобы сохранить передачу в каждом вызове. – fantabolous

+1

@dano благодарит вас за ваш ответ, у меня тоже был такой же запрос, и он его полностью решает, однако у меня есть еще один запрос: почему этот подход часто не используется для совместного использования состояния между процессами, а не с помощью объекта Manager, который имеет собственные накладные расходы на запуск серверных процессов и прокси-доступ? – bawejakunal

Смежные вопросы