2014-10-08 4 views
1

У меня есть набор длительных процессов в типичной настройке «pub/sub» с очередями для связи.Многопроцессорность Python - процесс сторожевого таймера?

Я хотел бы сделать две вещи, и я не могу понять, как выполнить оба одновременно:

  1. добавление/удаление рабочих. Например, я хочу иметь возможность добавлять дополнительных потребителей, если увижу, что размер ожидающих запросов слишком велик.
  2. Watchdog для моих процессов. Я хочу получать уведомление, если кто-либо из моих производителей или потребителей сработает.

можно сделать (2) в изоляции:

try: 
    while True: 
     for process in workers + consumers: 
      if not process.is_alive(): 
       logger.critical("%-8s%s died!", process.pid, process.name) 
     sleep(3) 
except KeyboardInterrupt: 
    # Python propagates CTRL+C to all workers, no need to terminate them 
    logger.warn('Received CTR+C, shutting down') 

Вышеуказанные блоки, что мешает мне делать (1).

Поэтому я решил переместить код в свой собственный процесс.

Это не работает, потому что process.is_alive() работает только для родителя, проверяющего статус своих детей. В этом случае процессы, которые я хочу проверить, будут братьями и сестрами вместо детей.

Я немного озадачен тем, как действовать. Как мой основной процесс может поддерживать изменения в подпроцессах, а также отслеживать подпроцессы?

+0

Вы не можете сделать это напрямую, по крайней мере, не таким образом, чтобы вы могли назвать «читаемый код, который имеет смысл». Чтобы управлять им, вам потребуется уровень абстракции, который распределяет задания для работников, которые могут получать команды для увеличения/уменьшения. Честно говоря, писать довольно сложно, и есть готовые системы, которые это делают, посмотрите на сельдерей. –

+0

@Puciek Я использовал сельдерей на других проектах. Он служит для другого варианта использования (AFAIK) - запуск асинхронных заданий.Я никогда не слышал, чтобы использовать его для управления долгосрочными производителями и потребителями. – knite

+0

Вы можете очень хорошо использовать его для запуска долгосрочных заданий, включая серверы потребителей - все скрипты были созданы равными в конце, просто не забудьте отключить тайм-аут. И он поставляется с функцией автомасштаба, которую вы, похоже, ищете. –

ответ

0

multiprocessing.Pool на самом деле уже установлен сторожевой таймер. Он запускает поток, который проверяет каждые 0,1 секунды, чтобы увидеть, умер ли рабочий. Если у него есть, он начинает новый, чтобы занять свое место:

def _handle_workers(pool): 
    thread = threading.current_thread() 

    # Keep maintaining workers until the cache gets drained, unless the pool 
    # is terminated. 
    while thread._state == RUN or (pool._cache and thread._state != TERMINATE): 
     pool._maintain_pool() 
     time.sleep(0.1) 
    # send sentinel to stop workers 
    pool._taskqueue.put(None) 
    debug('worker handler exiting') 

def _maintain_pool(self): 
    """Clean up any exited workers and start replacements for them. 
    """ 
    if self._join_exited_workers(): 
     self._repopulate_pool() 

Это в основном используется для реализации аргумента maxtasksperchild ключевого слова, и на самом деле проблематичен в некоторых случаях. Если процесс замирает, когда выполняется команда map или apply, и этот процесс находится в середине обработки задачи, связанной с этим вызовом, он никогда не завершится. См. this question для получения дополнительной информации об этом поведении.

Сказанное: если вы просто хотите знать, что процесс умер, вы можете просто создать поток (а не процесс), который контролирует pids всех процессов в пуле, и если pids в списке когда-либо изменения, вы знаете, процесс разбился:

def monitor_pids(pool): 
    pids = [p.pid for p in pool._pool] 
    while True: 
     new_pids = [p.pid for p in pool._pool] 
     if new_pids != pids: 
      print("A worker died") 
      pids = new_pids 
     time.sleep(3) 

Edit:

Если вы катитесь своей собственный Pool реализации, вы можете просто взять реплику от multiprocessing.Pool, и запустить свой код мониторинга в фоновый поток в родительском процессе. Проверки на то, что процессы все еще запущены, быстрые, поэтому потеря времени на фоновый поток, принимающий GIL, должна быть незначительной. Учтите, что сторожевой таймер multiprocessing.Process работает каждые 0,1 секунды! Запуск каждые 3 секунды не должен вызывать никаких проблем.

+0

Я не использую пул, потому что мои продюсеры не выполняют идентичную работу. Но я посмотрю на источник, чтобы узнать, могу ли я одолжить некоторые из того, что он делает - выглядит немного сложно, однако ... – knite

+0

@knite Ну, вы все равно можете использовать для этого пул. Не каждый процесс в «multiprocessing.Pool» должен выполнять идентичную работу. Было бы сложно получить автоматическое масштабирование, которое вы хотите, с помощью 'multiprocessing.Pool'. – dano