2015-03-06 5 views
3

У меня есть подпроцесс через multiprocessing.Process и очередь через multiprocessing.Queue.multiprocessing.Queue висит, когда процесс умирает

Основной процесс использует multiprocessing.Queue.get() для получения новых данных. Я не хочу, чтобы там был тайм-аут, и я хочу, чтобы он блокировал.

Однако, когда дочерний процесс умирает по какой-либо причине (вручную убит пользователем через kill, или segfault и т. Д.), Queue.get() просто будет висеть вечно.

Как я могу избежать этого?

ответ

4

Я думаю, multiprocessing.Queue не то, что я хочу.

Я использую сейчас

parent_conn, child_conn = multiprocessing.Pipe(duplex=True) 

получить два multiprocessing.Connection объектов. Затем I os.fork() или используйте multiprocessing.Process. В детстве я:

parent_conn.close() 
# read/write on child_conn 

В родителю (после развилки), я:

child_conn.close() 
# read/write on parent_conn 

Таким образом, когда я называю recv() на связи, он сгенерирует исключение (EOFError), когда ребенок/родитель умирает тем временем.

Обратите внимание, что это работает только для одного ребенка. Я думаю, что Queue означает, когда вы хотите несколько дочерних элементов. В этом случае у вас, вероятно, будет какой-то менеджер, который следит за тем, живы ли все дети и перезапускает их соответственно.

+0

Хороший улов. Я попробовал «Pipe», когда отвечал здесь, но у меня не получилось EOF-ошибка, когда клиент умер. Я думаю, что забыл закрыть связь. –

1

Queue не имеет возможности узнать, когда у него больше нет писателей. Вы можете передать объект на любое количество подпроцессов, и он не знает, передал ли он его любому данному подпроцессу. Поэтому придется ждать, даже если подпроцесс умирает. Очередь не файловый дескриптор, который автоматически закрывается, когда ребенок умирает.

Что вы ищете - это какой-то супервизор в родительском процессе, который замечает, что дети умирают неожиданно и обрабатывают эту ситуацию так, как вы считаете нужным. Вы можете сделать это, поймав процесс SIGCHLD, проверив Process.is_alive или используя Process.join в теме. Простая реализация будет использовать параметр timeout в вызове Queue.get и выполнить проверку Process.is_alive, когда это произойдет.

Если у вас есть немного больше контроля над смертью дочернего процесса, он должен отправить объект типа «EOF» (None или какой-либо маркер, который он сделан) в очередь, чтобы ваш родительский процесс мог обработайте его правильно.

+0

Ofc Я могу отправить EOF, но это никогда не будет охватывать все случаи, такие как «kill -9» пользователя, segfault или подобное. тайм-аут также не является реальным решением, потому что я не хочу, чтобы он вообще зависал - нет причин ни к чему. Я мог бы использовать SIGCHLD, но это похоже на перебор. Фоновая нить также кажется некоторой потерей ресурсов, чтобы сделать то, что также должно быть возможным более простым способом. – Albert

Смежные вопросы