2016-04-01 2 views
2

В python2.7 многопроцессорность.Queue выдает сломанную ошибку при инициализации изнутри функции. Я предоставляю минимальный пример, который воспроизводит проблему.Ошибка обрыва трубы с многопроцессорной обработкой.Queue

#!/usr/bin/python 
# -*- coding: utf-8 -*- 

import multiprocessing 

def main(): 
    q = multiprocessing.Queue() 
    for i in range(10): 
     q.put(i) 

if __name__ == "__main__": 
    main() 

бросает под сломанным ошибку трубы

Traceback (most recent call last): 
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed 
send(obj) 
IOError: [Errno 32] Broken pipe 

Process finished with exit code 0 

Я не могу расшифровать, почему. Было бы странно, что мы не можем заполнять объекты Queue внутри функции.

ответ

3

Что здесь происходит, что при вызове main(), он создает Queue, положить 10 объектов в нем и заканчивается функция, мусор сбора всех своих внутренних переменных и объектов, в том числе Queue. НО вы получите эту ошибку, потому что вы по-прежнему пытаетесь отправить последний номер в Queue.

из документации documentation:

«Когда процесс первого помещает элемент в очереди питатель поток начал, который передает объекты из буфера в трубу»

Как put() сделана в другом потоке, он не блокирует выполнение скрипта, и позволяет завершает функцию main() до завершения операции Очереди.

Попробуйте это:

#!/usr/bin/python 
# -*- coding: utf-8 -*- 

import multiprocessing 
import time 
def main(): 
    q = multiprocessing.Queue() 
    for i in range(10): 
     print i 
     q.put(i) 
    time.sleep(0.1) # Just enough to let the Queue finish 

if __name__ == "__main__": 
    main() 

Там должен быть способ join очереди или выполнение блока, пока объект не будет введен в Queue, вы должны посмотреть в документации.

+0

Замечательный ответ. Я собираюсь дать еще один, заявив, что в python3 этого не произойдет. – hAcKnRoCk

0

С задержкой, используя time.sleep(0.1) как предложено @HarryPotFleur, эта проблема решена. Тем не менее, я тестировал код с python3, и проблема с поврежденным каналом вообще не происходит в python3. Я думаю, что это было сообщено как ошибка, а позже исправлено.

+0

Это не правда, ** этого не происходит в python3 вообще. Более 'time.sleep (0.1)' не решает! Это было просто для понимания! –

1

Когда вы запускаете Queue.put(), неявный поток запускается для доставки данных в очередь. Между тем основное приложение завершено и нет конечной станции для данных (объект очереди собран из мусора).

Я хотел бы попробовать это:

from multiprocessing import Queue 

def main(): 
    q = Queue() 
    for i in range(10): 
     print i 
     q.put(i) 
    q.close() 
    q.join_thread() 

if __name__ == "__main__": 
    main() 

join_thread() обеспечивает все данные в буфере были покраснели. close() должно быть наречено до join_thread()

Смежные вопросы