2013-03-20 10 views
7

Я использую Python 2.7.3. Я распараллеливал некоторый код, используя подклассы multiprocessing.Process объектов. Если ошибок в коде в моих объектах Process подкласса, все работает нормально. Но если в моем подклассе объектов Process есть ошибки в коде, они, по-видимому, будут кратковременно разбиваться (на стеке не будет напечатано ни одного файла stacktrace), а загрузка процессора снизится до нуля. Родительский код никогда не сбой, создавая впечатление, что выполнение просто висит. Между тем, действительно сложно отследить, где ошибка в коде, потому что не указана информация о том, где ошибка.Процесс многопроцессорности Python бесшумно отключается

Я не могу найти другие вопросы о stackoverflow, которые касаются одной и той же проблемы.

Я думаю, что объекты класса подкласса, как представляется, аварийно завершатся, потому что они не могут печатать сообщение об ошибке в оболочке родителя, но я хотел бы знать, что я могу с этим сделать, чтобы я мог, по крайней мере, отлаживать более эффективно (и так, чтобы другие пользователи моего кода могли сказать мне, когда они сталкиваются с проблемами тоже).

EDIT: мой фактический код слишком сложен, но тривиальный пример подклассы объекта процесса с ошибкой в ​​нем будет что-то вроде этого:

from multiprocessing import Process, Queue 

class Worker(Process): 

    def __init__(self, inputQueue, outputQueue): 

     super(Worker, self).__init__() 

     self.inputQueue = inputQueue 
     self.outputQueue = outputQueue 

    def run(self): 

     for i in iter(self.inputQueue.get, 'STOP'): 

      # (code that does stuff) 

      1/0 # Dumb error 

      # (more code that does stuff) 

      self.outputQueue.put(result) 
+2

Вы можете оставить минимальный тестовый пример, который иллюстрирует эту проблему? – Blender

+0

@Blender Да. Добавлен код. – hendra

ответ

12

Что вы действительно хотите - это каким-то образом передать исключения до родительского процесса, верно? Тогда вы можете справиться с ними, как хотите.

Если вы используете concurrent.futures.ProcessPoolExecutor, это автоматический. Если вы используете multiprocessing.Pool, это тривиально. Если вы используете явные Process и Queue, вам нужно немного поработать, но это не , что много.

Например:

def run(self): 
    try: 
     for i in iter(self.inputQueue.get, 'STOP'): 
      # (code that does stuff) 
      1/0 # Dumb error 
      # (more code that does stuff) 
      self.outputQueue.put(result) 
    except Exception as e: 
     self.outputQueue.put(e) 

Затем ваш вызывающий код может просто читать Exception S из очереди, как все остальное. Вместо этого:

yield outq.pop() 

сделать это:

result = outq.pop() 
if isinstance(result, Exception): 
    raise result 
yield result 

(я не знаю, что ваш фактический код родительского процесса очереди чтение делает, потому что ваш минимальный образец просто игнорирует очереди, но с надеждой. это объясняет идею, хотя ваш реальный код на самом деле не работает.)

Предполагается, что вы хотите прервать любое необработанное исключение, которое делает его до run. Если вы хотите передать исключение и перейти к следующему i in iter, просто переместите try в for, а не вокруг него.

Это также предполагает, что Exception s недопустимые значения.Если это проблема, самое простое решение, чтобы просто нажать (result, exception) кортежи:

def run(self): 
    try: 
     for i in iter(self.inputQueue.get, 'STOP'): 
      # (code that does stuff) 
      1/0 # Dumb error 
      # (more code that does stuff) 
      self.outputQueue.put((result, None)) 
    except Exception as e: 
     self.outputQueue.put((None, e)) 

Тогда ваш код поппинг делает это:

result, exception = outq.pop() 
if exception: 
    raise exception 
yield result 

Вы можете заметить, что это похоже на Node.js обратного вызова стиль, где вы передаете (err, result) каждому обратному вызову. Да, это раздражает, и вы собираетесь испортить код в этом стиле. Но вы не используете его нигде, кроме как в обертке; весь ваш код «уровня приложения», который получает значения из очереди или получает вызов внутри run, просто видит нормальные доходности/доходность и поднятые исключения.

Возможно, вы захотите рассмотреть вопрос о создании Future по спецификации concurrent.futures (или используя этот класс как есть), даже если вы выполняете свою работу и выполняете ее вручную. Это не так сложно, и это дает вам очень хороший API, особенно для отладки.

Наконец, стоит отметить, что большинство кода, построенного вокруг рабочих и очередей, можно сделать намного проще с дизайном-исполнителем/пулом, даже если вы абсолютно уверены, что хотите только одного рабочего на одну очередь. Просто отбросьте весь шаблон и поверните петлю в методе Worker.run в функцию (которая только return s или raise s как обычно, вместо добавления в очередь). На вызывающей стороне снова отбросьте весь шаблон и только submit или map функцию задания с ее параметрами.

весь Ваш пример может быть уменьшена до:

def job(i): 
    # (code that does stuff) 
    1/0 # Dumb error 
    # (more code that does stuff) 
    return result 

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: 
    results = executor.map(job, range(10)) 

И он будет автоматически обрабатывать исключения должным образом.

Как вы упомянули в комментариях, трассировка для исключения не отскакивает назад в дочерний процесс; это доходит только до руководства raise result (или, если вы используете пул или исполнитель, кишки пула или исполнителя).

Причина заключается в том, что multiprocessing.Queue построен на вершине pickle, а исключения травления не раскалывают их следы. И причина в том, что вы не можете рассортировать трассировки. И причина этого в том, что трассировки полны ссылок на контекст локального исполнения, поэтому их работа в другом процессе будет очень сложной.

Итак ... что вы можете сделать по этому поводу? Не ищите полностью общего решения. Вместо этого подумайте о том, что вам действительно нужно. В 90% случаев вам нужно «зарегистрировать исключение с отслеживанием и продолжить» или «распечатать исключение с отслеживанием до stderr и exit(1) как обработчик обработанных по умолчанию необработанных исключений». Для любого из них вам вообще не нужно исключать исключение; просто отформатируйте его на дочерней стороне и передайте строку. Если вам do нужно что-то более причудливое, выработайте именно то, что вам нужно, и передайте достаточно информации, чтобы вручную собрать это вместе. Если вы не знаете, как отформатировать трассировку и исключения, см. Модуль traceback. Это довольно просто. И это означает, что вам не нужно вообще попадать в машину для рассола. (Не то, чтобы было очень сложно copyreg pickler или написать класс держателя с методом __reduce__ или что-нибудь еще, но если вам не нужно, зачем это все узнавать?)

+1

Спасибо! Отлично. Но есть ли способ распечатать всю трассировку стека? Он говорит мне, что сейчас есть ошибка, и что это такое, но не WHERE в классе Worker ошибка возникает. – hendra

+0

@npo: Я добавлю ответ, чтобы объяснить это. – abarnert

+0

Как это можно применить к 'apply_async', который просто использует функцию, предназначенную для возврата некоторого результата в обратный вызов. Мы просто обертываем внутренности асинхронной функции в try/except, а затем просто возвращаем объект исключения в обратный вызов? – CMCDragonkai

1

Это не ответ, просто расширенный комментарий. Пожалуйста, запустите эту программу скажите нам, что выход (если таковые имеются), вы получите:

from multiprocessing import Process, Queue 

class Worker(Process): 

    def __init__(self, inputQueue, outputQueue): 

     super(Worker, self).__init__() 

     self.inputQueue = inputQueue 
     self.outputQueue = outputQueue 

    def run(self): 

     for i in iter(self.inputQueue.get, 'STOP'): 

      # (code that does stuff) 

      1/0 # Dumb error 

      # (more code that does stuff) 

      self.outputQueue.put(result) 

if __name__ == '__main__': 
    inq, outq = Queue(), Queue() 
    inq.put(1) 
    inq.put('STOP') 
    w = Worker(inq, outq) 
    w.start() 

я получаю:

% test.py 
Process Worker-1: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/home/unutbu/pybin/test.py", line 21, in run 
    1/0 # Dumb error 
ZeroDivisionError: integer division or modulo by zero 

Я удивлен (если) вы не получите ничего.

+0

Я был бы удивлен, если бы он ничего не получил на POSIX в оболочке. Но в Windows или в IDLE или PyDev, или если родительский процесс является графическим приложением ... Я бы не стал делать ставки в любом случае ... – abarnert

+0

@unutbu Я ничего не получил. Использование 64-битной Windows и IDLE. – hendra

+0

@npo: Хорошо, и что произойдет, если вы запустите его с консоли? – unutbu

2

Я предлагаю такой обходной путь для показа исключения процесса в

from multiprocessing import Queue, Process, RawValue, Semaphore, Lock, Pool 
import traceback 
run_old = Process.run 

def run_new(*args, **kwargs): 
    try: 
     run_old(*args, **kwargs) 
    except (KeyboardInterrupt, SystemExit): 
     raise 
    except: 
     traceback.print_exc(file=sys.stdout) 

Process.run = run_new 
+0

простой, лучший ответ – CloudyGloudy

Смежные вопросы