2011-12-16 2 views
3

Мой сценарий заключается в следующем:Восстановление потерял multiprocessing.Queue пунктов, когда рабочий процесс умирает

  • У меня есть рабочий, который ставит в очередь задачи в multiprocessing.Queue(), если указанный пуст. Это делается для того, чтобы выполнение задач соответствовало определенному приоритету и многопроцессорности. Queue() не выполняет приоритеты.
  • Существует множество рабочих, которые появляются из mp.Queue и делают некоторые вещи. Иногда (< 0,1%) они терпят неудачу и умирают, не имея возможности повторно выполнить задание.
  • Мои задачи заблокированы через центральную базу данных и могут выполняться только один раз (жесткое требование). Для этого у них есть определенные состояния, которые они могут переходить из/в.

Мое текущее решение: Пусть все сотрудники ответят через другую очередь, задачи которой были завершены, и ввести предельный срок, с помощью которого задача должна быть выполнена. Сбросьте задачу и повторно заклейте ее, если срок достигнут. У этого есть проблема, что решение является «мягким», т. Е. Крайний срок является произвольным.

Я ищу простейшее возможное решение. Есть ли более простое или более строгое решение?

+0

Можете ли вы изменить работника, чтобы при выполнении задачи рабочий сам повторно задал задачу? – unutbu

+0

Что происходит, когда задача выходит из строя? Исключение Python, которое вы могли бы поймать? –

+0

@unutbu Это уже так, если они терпят неудачу, они перезаписываются. Однако они могут потерпеть неудачу по другим причинам, без того, чтобы рабочий контролировал их (segfault и т. Д.). –

ответ

3

Это решение использует три очереди, чтобы следить за работой (моделируемых в WORK_ID):

  • todo_q: Любые работы предстоит сделать (в том числе быть переделано, если процесс умер в полете)
  • start_q: Любая работа, которая была начата в процессе
  • finish_q: Любая работа, которая была завершена

Us При использовании этого метода вам не нужен таймер. Пока вы назначаете идентификатор процесса и отслеживаете присвоения, проверьте, есть ли Process.is_alive(). Если процесс скончался, добавьте эту работу обратно в очередь todo.

В приведенном ниже коде я имитировать рабочий процесс умирающую 25% времени ...

from multiprocessing import Process, Queue 
from Queue import Empty 
from random import choice as rndchoice 
import time 

def worker(id, todo_q, start_q, finish_q): 
    """multiprocessing worker""" 
    msg = None 
    while (msg!='DONE'): 
     try: 
      msg = todo_q.get_nowait() # Poll non-blocking on todo_q 
      if (msg!='DONE'): 
       start_q.put((id, msg)) # Let the controller know work started 
       time.sleep(0.05) 
       if (rndchoice(range(3))==1): 
        # Die a fraction of the time before finishing 
        print "DEATH to worker %s who had task=%s" % (id, msg) 
        break 
       finish_q.put((id, msg)) # Acknowledge work finished 
     except Empty: 
      pass 
    return 

if __name__ == '__main__': 
    NUM_WORKERS = 5 
    WORK_ID = set(['A','B','C','D','E']) # Work to be done, you will need to 
            # name work items so they are unique 
    WORK_DONE = set([])    # Work that has been done 
    ASSIGNMENTS = dict()   # Who was assigned a task 
    workers = dict() 
    todo_q = Queue() 
    start_q = Queue() 
    finish_q = Queue() 

    print "Starting %s tasks" % len(WORK_ID) 
    # Add work 
    for work in WORK_ID: 
     todo_q.put(work) 

    # spawn workers 
    for ii in xrange(NUM_WORKERS): 
     p = Process(target=worker, args=(ii, todo_q, start_q, finish_q)) 
     workers[ii] = p 
     p.start() 

    finished = False 
    while True: 
     try: 
      start_ack = start_q.get_nowait() # Poll for work started 
      ## Check for race condition between start_ack and finished_ack 
      if not ASSIGNMENTS.get(start_ack[0], False): 
       ASSIGNMENTS[start_ack[0]] = start_ack # Track the assignment 
       print "ASSIGNED worker=%s task=%s" % (start_ack[0], 
        start_ack[1]) 
       WORK_ID.remove(start_ack[1])  # Account for started tasks 
      else: 
       # Race condition. Never overwrite existing assignments 
       # Wait until the ASSIGNMENT is cleared 
       start_q.put(start_ack) 
     except Empty: 
      pass 

     try: 
      finished_ack = finish_q.get_nowait() # Poll for work finished 
      # Check for race condition between start_ack and finished_ack 
      if (ASSIGNMENTS[finished_ack[0]][1]==finished_ack[1]): 
       # Clean up after the finished task 
       print "REMOVED worker=%s task=%s" % (finished_ack[0], 
        finished_ack[1]) 
       del ASSIGNMENTS[finished_ack[0]] 
       WORK_DONE.add(finished_ack[1]) 
      else: 
       # Race condition. Never overwrite existing assignments 
       # It was received out of order... wait for the 'start_ack' 
       finish_q.put(finished_ack) 
      finished_ack = None 
     except Empty: 
      pass 

     # Look for any dead workers, and put their work back on the todo_q 
     if not finished: 
      for id, p in workers.items(): 
       status = p.is_alive() 
       if not status: 
        print " WORKER %s FAILED!" % id 
        # Add to the work again... 
        todo_q.put(ASSIGNMENTS[id][1]) 
        WORK_ID.add(ASSIGNMENTS[id][1]) 
        del ASSIGNMENTS[id]  # Worker is dead now 
        del workers[id] 
        ii += 1 
        print "Spawning worker number", ii 
        # Respawn a worker to replace the one that died 
        p = Process(target=worker, args=(ii, todo_q, start_q, 
         finish_q)) 
        workers[ii] = p 
        p.start() 
     else: 
      for id, p in workers.items(): 
       p.join() 
       del workers[id] 
      break 

     if (WORK_ID==set([])) and (ASSIGNMENTS.keys()==list()): 
      finished = True 
      [todo_q.put('DONE') for x in xrange(NUM_WORKERS)] 
     else: 
      pass 
    print "We finished %s tasks" % len(WORK_DONE) 

Запуск этого на моем ноутбуке ...

[email protected]:~$ python queueack.py 
Starting 5 tasks 
ASSIGNED worker=2 task=C 
ASSIGNED worker=0 task=A 
ASSIGNED worker=4 task=B 
ASSIGNED worker=3 task=E 
ASSIGNED worker=1 task=D 
DEATH to worker 4 who had task=B 
DEATH to worker 3 who had task=E 
    WORKER 3 FAILED! 
Spawning worker number 5 
    WORKER 4 FAILED! 
Spawning worker number 6 
REMOVED worker=2 task=C 
REMOVED worker=0 task=A 
REMOVED worker=1 task=D 
ASSIGNED worker=0 task=B 
ASSIGNED worker=2 task=E 
REMOVED worker=2 task=E 
DEATH to worker 0 who had task=B 
    WORKER 0 FAILED! 
Spawning worker number 7 
ASSIGNED worker=5 task=B 
REMOVED worker=5 task=B 
We finished 5 tasks 
[email protected]:~$ 

Я испытал это с более 10000 рабочих предметов с коэффициентом смертности на 25%.

+0

Звучит многообещающе. Я собираюсь попробовать это. –

+0

Я немного переписал его. Теперь я использую только 2 очереди, по одному в каждом направлении. На стороне результата я добавил небольшой протокол (я возвращаю 3-кортеж), который сигнализирует, что произошло. Таким образом, я остаюсь расширяемым, и он добавил всего около 20 строк кода. В любом случае: первый тест в реальном мире завершен, и он отлично работает. –

Смежные вопросы