2012-01-01 2 views
5

Я пытаюсь написать, казалось бы, простую реализацию классического производителя - потребительского идиома в Python. Существует один сравнительно быстрый производитель для нескольких более медленных потребителей. В принципе, этот прост в использовании с использованием модуля Queue, а в библиотечной документации приведен пример появления нескольких строк кода.Производитель/потребитель Python с обработкой исключений

Однако, я также хочу, чтобы код работал правильно в случае возникновения исключений. Как производитель и все потребители должны прекратить в случае, если любой из следующих вещей произойдет:

  • производитель терпит неудачу с исключением
  • любой потребитель не может с исключением
  • пользователь останавливает программу (вызывая KeyboardInterrupt)

После этого весь процесс не должен поднимать начальное исключение, чтобы сообщить вызывающему абоненту о том, что пошло не так.

Основная проблема заключается в том, чтобы чисто закончить потребительский поток, не заканчивая в блокирующем соединении(). Похоже, популярно установить Thread.deamon = True, но для моего понимание этого приводит к утечкам ресурсов в случае сбоя производителя с исключением.

Мне удалось написать реализацию, которая соответствует моим требованиям (см. Ниже). Однако Я нахожу код намного сложнее, чем ожидалось.

Есть ли более простой способ справиться с этим сценарием?

Вот несколько примеров вызовов и полученный окончательный журнал сообщений от моего текущего реализации:

производства и потребления 10 наименований:

$ python procon.py 
INFO:root:processed all items 

не производят предметы:

$ python procon.py --items 0 
INFO:root:processed all items 

Производите 5 наименований для 10 потребителей, таким образом, используя только некоторые из доступных потребителей:

$ python procon.py --items 5 --consumers 10 
INFO:root:processed all items 

Прерывание по прижимного Control-C:

$ python procon.py 
^CWARNING:root:interrupted by user 

не приводят к пункту 3:

$ python procon.py --producer-fails-at 3 
ERROR:root:cannot produce item 3 

Отказ потреблять пункт 3:

$ python procon.py --consumer-fails-at 3 
ERROR:root:cannot consume item 3 

Нормально потреблять последний элемент:

$ python procon.py --items 10 --consumer-fails-at 9 
ERROR:root:cannot consume item 9 

И вот, вероятно, слишком сложный исходный код:

""" 
Consumer/producer to test exception handling in threads. Both the producer 
and the consumer can be made to fail deliberately when processing a certain 
item using command line options. 
""" 
import logging 
import optparse 
import Queue 
import threading 
import time 

_PRODUCTION_DELAY = 0.1 
_CONSUMPTION_DELAY = 0.3 

# Delay for ugly hacks and polling loops. 
_HACK_DELAY = 0.05 

class _Consumer(threading.Thread): 
    """ 
    Thread to consume items from an item queue filled by a producer, which can 
    be told to terminate in two ways: 

    1. using `finish()`, which keeps processing the remaining items on the 
     queue until it is empty 
    2. using `cancel()`, which finishes consuming the current item and then 
     terminates 
    """ 
    def __init__(self, name, itemQueue, failedConsumers): 
     super(_Consumer, self).__init__(name=name) 
     self._log = logging.getLogger(name) 
     self._itemQueue = itemQueue 
     self._failedConsumers = failedConsumers 
     self.error = None 
     self.itemToFailAt = None 
     self._log.info(u"waiting for items to consume") 
     self._isFinishing = False 
     self._isCanceled = False 

    def finish(self): 
     self._isFinishing = True 

    def cancel(self): 
     self._isCanceled = True 

    def consume(self, item): 
     self._log.info(u"consume item %d", item) 
     if item == self.itemToFailAt: 
      raise ValueError("cannot consume item %d" % item) 
     time.sleep(_CONSUMPTION_DELAY) 

    def run(self): 
     try: 
      while not (self._isFinishing and self._itemQueue.empty()) \ 
        and not self._isCanceled: 
       # HACK: Use a timeout when getting the item from the queue 
       # because between `empty()` and `get()` another consumer might 
       # have removed it. 
       try: 
        item = self._itemQueue.get(timeout=_HACK_DELAY) 
        self.consume(item) 
       except Queue.Empty: 
        pass 
      if self._isCanceled: 
       self._log.info(u"canceled") 
      if self._isFinishing: 
       self._log.info(u"finished") 
     except Exception, error: 
      self._log.error(u"cannot continue to consume: %s", error) 
      self.error = error 
      self._failedConsumers.put(self) 


class Worker(object): 
    """ 
    Controller for interaction between producer and consumers. 
    """ 
    def __init__(self, itemsToProduceCount, itemProducerFailsAt, 
      itemConsumerFailsAt, consumerCount): 
     self._itemsToProduceCount = itemsToProduceCount 
     self._itemProducerFailsAt = itemProducerFailsAt 
     self._itemConsumerFailsAt = itemConsumerFailsAt 
     self._consumerCount = consumerCount 
     self._itemQueue = Queue.Queue() 
     self._failedConsumers = Queue.Queue() 
     self._log = logging.getLogger("producer") 
     self._consumers = [] 

    def _possiblyRaiseConsumerError(self): 
      if not self._failedConsumers.empty(): 
       failedConsumer = self._failedConsumers.get() 
       self._log.info(u"handling failed %s", failedConsumer.name) 
       raise failedConsumer.error 

    def _cancelAllConsumers(self): 
     self._log.info(u"canceling all consumers") 
     for consumerToCancel in self._consumers: 
      consumerToCancel.cancel() 
     self._log.info(u"waiting for consumers to be canceled") 
     for possiblyCanceledConsumer in self._consumers: 
      # In this case, we ignore possible consumer errors because there 
      # already is an error to report. 
      possiblyCanceledConsumer.join(_HACK_DELAY) 
      if possiblyCanceledConsumer.isAlive(): 
       self._consumers.append(possiblyCanceledConsumer) 

    def work(self): 
     """ 
     Launch consumer thread and produce items. In case any consumer or the 
     producer raise an exception, fail by raising this exception 
     """ 
     self.consumers = [] 
     for consumerId in range(self._consumerCount): 
      consumerToStart = _Consumer(u"consumer %d" % consumerId, 
       self._itemQueue, self._failedConsumers) 
      self._consumers.append(consumerToStart) 
      consumerToStart.start() 
      if self._itemConsumerFailsAt is not None: 
       consumerToStart.itemToFailAt = self._itemConsumerFailsAt 

     self._log = logging.getLogger("producer ") 
     self._log.info(u"producing %d items", self._itemsToProduceCount) 

     for itemNumber in range(self._itemsToProduceCount): 
      self._possiblyRaiseConsumerError() 
      self._log.info(u"produce item %d", itemNumber) 
      if itemNumber == self._itemProducerFailsAt: 
       raise ValueError("ucannot produce item %d" % itemNumber) 
      # Do the actual work. 
      time.sleep(_PRODUCTION_DELAY) 
      self._itemQueue.put(itemNumber) 

     self._log.info(u"telling consumers to finish the remaining items") 
     for consumerToFinish in self._consumers: 
      consumerToFinish.finish() 
     self._log.info(u"waiting for consumers to finish") 
     for possiblyFinishedConsumer in self._consumers: 
      self._possiblyRaiseConsumerError() 
      possiblyFinishedConsumer.join(_HACK_DELAY) 
      if possiblyFinishedConsumer.isAlive(): 
       self._consumers.append(possiblyFinishedConsumer) 


if __name__ == "__main__": 
    logging.basicConfig(level=logging.INFO) 
    parser = optparse.OptionParser() 
    parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which consumer fails (default: %default)") 
    parser.add_option("-i", "--items", metavar="NUMBER", type="long", 
     help="number of items to produce (default: %default)", default=10) 
    parser.add_option("-n", "--consumers", metavar="NUMBER", type="long", 
     help="number of consumers (default: %default)", default=2) 
    parser.add_option("-p", "--producer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which producer fails (default: %default)") 
    options, others = parser.parse_args() 
    worker = Worker(options.items, options.producer_fails_at, 
     options.consumer_fails_at, options.consumers) 
    try: 
     worker.work() 
     logging.info(u"processed all items") 
    except KeyboardInterrupt: 
     logging.warning(u"interrupted by user") 
     worker._cancelAllConsumers() 
    except Exception, error: 
     logging.error(u"%s", error) 
     worker._cancelAllConsumers() 
+0

Может быть, не то, что вы ищете, но есть большая библиотека Python называется сельдереем, который можно использовать вместо написания собственной реализации очередей. –

+0

Спасибо за указатель. Сельдерей выглядит интересным для сложных задач, используя веб-службы и базы данных. Для моей конкретной задачи продюсер читает строки из файла и выполняет некоторые базовые структурные синтаксические разборки и передает данные потребителям - поэтому в основном интенсивная работа ввода-вывода. Потребители обрабатывают данные, занимающиеся интенсивной работой ЦП. Поскольку все это происходит в памяти на той же машине, стандартная очередь Python выглядит нормально. – roskakori

ответ

0

Поскольку ответы до сих пор давали хорошие подсказки, но мне не хватало рабочего кода, я взял код из своего вопроса и завернул его в библиотеку, которая доступна от http://pypi.python.org/pypi/proconex/. Исходный код можно найти по адресу https://github.com/roskakori/proconex. Хотя интерфейс чувствует себя разумным, в реализации по-прежнему используется опрос, поэтому вклады приветствуются.

Любое исключение в потоке производителя или потребителя ререйзируется в основном потоке. Просто убедитесь, что вы используете инструкцию with или finally:worker.close(), чтобы убедиться, что все потоки закрыты правильно.

Вот небольшой пример для производителя с двумя потребителями для целых чисел:

import logging 
import proconex 

class IntegerProducer(proconex.Producer): 
    def items(self): 
     for item in xrange(10): 
      logging.info('produce %d', item) 
      yield item 

class IntegerConsumer(proconex.Consumer): 
    def consume(self, item): 
     logging.info('consume %d with %s', item, self.name) 

if __name__ == '__main__': 
    logging.basicConfig(level=logging.INFO) 
    producer = IntegerProducer() 
    consumer1 = IntegerConsumer('consumer1') 
    consumer2 = IntegerConsumer('consumer2') 

    with proconex.Worker(producer, [consumer1, consumer2]) as worker: 
     worker.work() 
2

Вам нужна очередь с методом отмены, который очищает внутреннюю очередь, устанавливает отмененный флаг, а затем будит все вверх. Работник просыпается с join(), проверяет отмененный флаг в очереди и действует соответствующим образом. Потребители проснутся от get() и отметят отмененный флаг в очереди и распечатывают ошибку. Тогда вашему потребителю просто нужно будет вызвать метод cancel() в случае исключения.

К сожалению, у Python Queue нет метода отмены. Несколько вариантов переход к виду:

  • Ролла своих собственным очередям (может быть сложны, чтобы получить это право)
  • Продлить очередь питона и добавить отменить метод (пары коды для внутренней реализации очереди Python класс)
  • Прокси класс очереди и перегрузка соединяются/получаются с вашей оживленной логикой ожидания (все еще взломанный режим ожидания, но ограничивает его одним пятном и очищает код производителя/потребителя)
  • Найти другую реализацию очереди/библиотека
+0

Да, перемещение логики отмены в очередь, безусловно, очистит рабочий код. Учитывая мои требования, очередь также должна быть способна запомнить возможную информацию об исключении, потому что я хочу, чтобы потребители сообщали об ошибке работнику, а не просто печатали его. Но это, безусловно, можно сделать. Кто-нибудь знает о существующей реализации такой очереди? – roskakori

Смежные вопросы