2013-07-21 3 views
3

В простом скрипте, который использует подпроцесс для вывода gzip (используя subprocess.PIPE для stdin внешней команды), если объект multiprocessing.Pool создается между моментом времени подпроцесс создается и stdin процесса закрывается, subprocess.wait() будет зависать вечно.Подпроцесс подпроцесса Python() терпит неудачу, если многопроцессор. Создается пул

import multiprocessing 
import subprocess 

proc = subprocess.Popen(["gzip", "-c", "-"], 
         stdout=open('filename', 'w'), stdin=subprocess.PIPE) 
multiprocessing.Pool() 
proc.stdin.close() 
proc.wait() 

Перемещение многопроцессорной обработки. Перемещение по одной линии вверх или одна строка вниз предотвращает проблему.

Я испытываю это на Python 2.7.3 (Linux) и Python 2.7.1 (OS X).

Очевидно, что это тривиальный пример - реальное использование намного сложнее. Я также уже знаю GzipFile - я бы предпочел не использовать его; использование подпроцесса позволяет мне увеличить загрузку процессора, разделив gzipping на отдельный поток.

Я не вижу, как просто создание пула должно иметь такое влияние.

ответ

7

При вызове multiprocessing.Pool модуль multiprocessing создает несколько новых процессов (с использованием os.fork или аналогичных).

По умолчанию во время fork новые процессы наследуют все дескрипторы открытых файлов.

Когда вы вызываете subprocess.Popen с аргументом subprocess.PIPE, модуль subprocess создает несколько новых дескрипторов файлов файлов для отправки данных в/из нового процесса. В этом конкретном случае труба используется для отправки данных из родительского процесса (python) в дочерний объект (gzip), а gzip выйдет - и, таким образом, закончит proc.wait(), когда все доступ на запись к трубе уходит. (Это то, что генерирует «EOF на трубе»: в этом канале не существует описательных файловых дескрипторов.)

Таким образом, в этом случае, если вы (все в «исходном» процессе python) выполните это в эта последовательность:

  1. создать трубу
  2. создать некоторые multiprocessing.Pool процессы
  3. отправить данные в GZIP
  4. закрыть трубу GZIP

, то из-за поведения fork каждый из процессов пула имеет os.dup канала write-to-gzip, поэтому gzip продолжает ожидать большего количества данных, которые эти процессы пула могут (но никогда не делать) отправлять. Процесс gzip будет завершен, как только пул завершит работу над своими дескрипторами.

Фиксация этого в реальном (более сложном) коде может быть нетривиальной. В идеале, вы хотите, чтобы multiprocessing.Pool знал (каким-то образом), какие файловые дескрипторы должны быть сохранены, а что не должно, но это не так просто, как «просто закрыть кучу дескрипторов в созданных дочерних процессах»:

output = open('somefile', 'a') 
def somefunc(arg): 
    ... do some computation, etc ... 
    output.write(result) 
pool = multiprocessing.Pool() 
pool.map(somefunc, iterable) 

Очевидно, output.fileno() должен использоваться совместно с рабочими процессами.

Вы можете попробовать использовать Pool «s initializer вызвать proc.stdin.close (или os.close в списке х Fd), но тогда вам нужно организовать, чтобы следить за дескрипторами закрывает. Вероятно, проще всего перестроить ваш код, чтобы избежать создания пула «в неподходящее время».

+0

Отлично! Спасибо за объяснение - по крайней мере теперь это имеет смысл (хотя это отстой). В моем случае записи многопроцессорности вообще не записываются в файлы, но реструктуризация кода будет ... немного больной. Похоже, что это, вероятно, единственный способ пойти. Я на самом деле не понял, что у вас могут быть дубликаты файловых дескрипторов. –

+0

Это не единственный способ: как я заметил, используя функцию инициализатора, вы можете закрыть определенные дескрипторы. Я просто подозреваю (без какого-либо кода, чтобы посмотреть), что это будет по крайней мере так же сложно, как избежать их открытости в первую очередь. :-) – torek