2012-02-09 3 views
6

Я пытался сопоставить некоторый код с помощью concurrent.futures.ProcessPoolExecutor, но у вас были странные взаимоблокировки, которые не встречаются с ThreadPoolExecutor. Минимальный пример:Тупик в файле concurrent.futures

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     executor.submit(test) 

В питон 3.2.2 (на 64-битной Ubuntu), это, кажется, висит постоянно после подачи всех рабочих мест - и это, кажется, происходит всякий раз, когда число рабочих мест, представленных выше, чем количество работников. Если я заменил ProcessPoolExecutor на ThreadPoolExecutor, он работает безупречно.

В попытке исследовать, я дал каждое будущее обратный вызов для печати значения i:

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test) 

     def callback(f): 
      print('callback {}'.format(i)) 
     future.add_done_callback(callback) 

Это просто смутил меня еще больше - значение i распечатывается callback этим значение на время, которое оно называется, а не в то время, когда оно было определено (поэтому я никогда не вижу callback 0, но я получаю много callback 99). Опять же, ThreadPoolExecutor распечатывает ожидаемое значение.

Удивление, если это может быть ошибка, я попробовал недавнюю версию python. Теперь код, по крайней мере, заканчивается, но я все равно получаю неправильное значение i.

Так может кто-нибудь объяснить:

  • , что случилось с ProcessPoolExecutor между питоном 3.2 и текущей версией Dev, что, по-видимому фиксированной из этого тупика

  • почему «неправильное» значение i печатаются

EDIT: в jukiewicz указал ниже, конечно печать i будет печатать значение во время вызова обратного вызова, я не знаю, что я думал ... если я передаю вызываемый объект со значением i в качестве одного из его атрибутов, который работает так, как ожидалось.

EDIT: немного больше информации: все обратные вызовы выполняются, поэтому он выглядит как executor.shutdown (вызываемый executor.__exit__), который не может сказать, что процессы завершены. Это, похоже, полностью исправлено в текущем python 3.3, но, похоже, было много изменений в multiprocessing и concurrent.futures, поэтому я не знаю, что исправлено. Поскольку я не могу использовать 3.3 (он, похоже, не совместим ни с версией, ни с версией numpy), я попробовал просто скопировать многопроцессорные и параллельные пакеты на мою установку 3.2, которая, кажется, работает нормально. Тем не менее, кажется немного странным, что, насколько я вижу, ProcessPoolExecutor полностью нарушен в последней версии, но никто не пострадал.

+1

Что касается второго, естественно, что процессы печатают '99'. символ 'i' связан глобальным контекстом, и создание новых процессов является дорогостоящим, поэтому к тому времени, когда вы получите что-нибудь,' i == 99'. – julkiewicz

+1

Кроме того, у меня Ubuntu 64-бит, Python 3.2.2 и первый фрагмент кода не зависает ... – julkiewicz

+0

@julkiewicz: это очень странно. Я только что попробовал это на другой машине с 64-разрядной Scientific Linux и python 3.2.2, и она застопорилась после того, как напечатала «отправку 99» на 10 попыток из 10. Я даже попробовал обернуть код в 'if __name__ == ' __main __ '', как я слышал, что необходимо для многопроцессорности в Windows. – James

ответ

2

Я изменил код следующим образом, который решил обе проблемы. callback функция была определена как замыкание, поэтому каждый раз будет использоваться обновленное значение i. Что касается тупика, это может стать причиной закрытия Исполнителя до того, как все задание будет завершено. В ожидании завершения фьючерса это тоже.

from concurrent import futures 

def test(i): 
    return i 

def callback(f): 
    print('callback {}'.format(f.result())) 


with futures.ProcessPoolExecutor(4) as executor: 
    fs = [] 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test, i) 
     future.add_done_callback(callback) 
     fs.append(future) 

    for _ in futures.as_completed(fs): pass 

ОБНОВЛЕНИЕ: О, извините, я не читал ваши обновления, это, похоже, уже решено.

Смежные вопросы