Что вы действительно хотите - это каким-то образом передать исключения до родительского процесса, верно? Тогда вы можете справиться с ними, как хотите.
Если вы используете concurrent.futures.ProcessPoolExecutor
, это автоматический. Если вы используете multiprocessing.Pool
, это тривиально. Если вы используете явные Process
и Queue
, вам нужно немного поработать, но это не , что много.
Например:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1/0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
except Exception as e:
self.outputQueue.put(e)
Затем ваш вызывающий код может просто читать Exception
S из очереди, как все остальное. Вместо этого:
yield outq.pop()
сделать это:
result = outq.pop()
if isinstance(result, Exception):
raise result
yield result
(я не знаю, что ваш фактический код родительского процесса очереди чтение делает, потому что ваш минимальный образец просто игнорирует очереди, но с надеждой. это объясняет идею, хотя ваш реальный код на самом деле не работает.)
Предполагается, что вы хотите прервать любое необработанное исключение, которое делает его до run
. Если вы хотите передать исключение и перейти к следующему i in iter
, просто переместите try
в for
, а не вокруг него.
Это также предполагает, что Exception
s недопустимые значения.Если это проблема, самое простое решение, чтобы просто нажать (result, exception)
кортежи:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1/0 # Dumb error
# (more code that does stuff)
self.outputQueue.put((result, None))
except Exception as e:
self.outputQueue.put((None, e))
Тогда ваш код поппинг делает это:
result, exception = outq.pop()
if exception:
raise exception
yield result
Вы можете заметить, что это похоже на Node.js обратного вызова стиль, где вы передаете (err, result)
каждому обратному вызову. Да, это раздражает, и вы собираетесь испортить код в этом стиле. Но вы не используете его нигде, кроме как в обертке; весь ваш код «уровня приложения», который получает значения из очереди или получает вызов внутри run
, просто видит нормальные доходности/доходность и поднятые исключения.
Возможно, вы захотите рассмотреть вопрос о создании Future
по спецификации concurrent.futures
(или используя этот класс как есть), даже если вы выполняете свою работу и выполняете ее вручную. Это не так сложно, и это дает вам очень хороший API, особенно для отладки.
Наконец, стоит отметить, что большинство кода, построенного вокруг рабочих и очередей, можно сделать намного проще с дизайном-исполнителем/пулом, даже если вы абсолютно уверены, что хотите только одного рабочего на одну очередь. Просто отбросьте весь шаблон и поверните петлю в методе Worker.run
в функцию (которая только return
s или raise
s как обычно, вместо добавления в очередь). На вызывающей стороне снова отбросьте весь шаблон и только submit
или map
функцию задания с ее параметрами.
весь Ваш пример может быть уменьшена до:
def job(i):
# (code that does stuff)
1/0 # Dumb error
# (more code that does stuff)
return result
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
results = executor.map(job, range(10))
И он будет автоматически обрабатывать исключения должным образом.
Как вы упомянули в комментариях, трассировка для исключения не отскакивает назад в дочерний процесс; это доходит только до руководства raise result
(или, если вы используете пул или исполнитель, кишки пула или исполнителя).
Причина заключается в том, что multiprocessing.Queue
построен на вершине pickle
, а исключения травления не раскалывают их следы. И причина в том, что вы не можете рассортировать трассировки. И причина этого в том, что трассировки полны ссылок на контекст локального исполнения, поэтому их работа в другом процессе будет очень сложной.
Итак ... что вы можете сделать по этому поводу? Не ищите полностью общего решения. Вместо этого подумайте о том, что вам действительно нужно. В 90% случаев вам нужно «зарегистрировать исключение с отслеживанием и продолжить» или «распечатать исключение с отслеживанием до stderr
и exit(1)
как обработчик обработанных по умолчанию необработанных исключений». Для любого из них вам вообще не нужно исключать исключение; просто отформатируйте его на дочерней стороне и передайте строку. Если вам do нужно что-то более причудливое, выработайте именно то, что вам нужно, и передайте достаточно информации, чтобы вручную собрать это вместе. Если вы не знаете, как отформатировать трассировку и исключения, см. Модуль traceback
. Это довольно просто. И это означает, что вам не нужно вообще попадать в машину для рассола. (Не то, чтобы было очень сложно copyreg
pickler или написать класс держателя с методом __reduce__
или что-нибудь еще, но если вам не нужно, зачем это все узнавать?)
Вы можете оставить минимальный тестовый пример, который иллюстрирует эту проблему? – Blender
@Blender Да. Добавлен код. – hendra