2013-09-22 2 views
1

Вот пример. У меня есть один производитель и несколько потребителей.Сообщите потребителям прекратить ожидание элементов очереди

#!/usr/bin/env python2 

from multiprocessing import Process, Queue 
import time 

def counter(low, high): 
    current = low 
    while current <= high: 
     yield current 
     current += 1 

def put_tasks(q): 
    for c in counter(0, 9): 
     q.put(c) 
     time.sleep(.1) 
    print('put_tasks: no more tasks') 

def work(id, q): 
    while True: 
     task = q.get() 
     print('process %d: %s' % (id, task)) 
     time.sleep(.3) 
    print('process %d: done' % id) 

if __name__ == '__main__': 
    q = Queue(2) 
    task_gen = Process(target=put_tasks, args=(q,)) 
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)] 

    task_gen.start() 
    for p in processes: 
     p.start() 
    for p in processes: 
     p.join() 

counter просто номер генератора для put_tasks. Как правило, у меня было бы несколько тысяч задач вместо 10, как в этом примере. Точка этого кода состоит в том, чтобы поэтапно кормить очередь задачами.

Проблема заключается в том, что потребители не могут заранее знать, сколько задач им придется обрабатывать, но функция put_tasks знает, когда это будет сделано (тогда она печатает no more tasks).

Пример вывода:.

process 2: 0 
process 0: 1 
process 1: 2 
process 2: 3 
process 0: 4 
process 1: 5 
process 2: 6 
process 0: 7 
process 1: 8 
process 2: 9 
put_tasks: no more tasks 

Все задачи обрабатываются, но программа затем виснет (каждый процесс застревает на q.get() Я хотел бы, чтобы это прекратить, когда все задачи были обработаны без ущерба для скорости и безопасности (нет некрасивых таймауты).

Любые идеи?

+1

Вы должны заменить 'counter()' на 'xrange()' (или 'range()', если вы находитесь на Python 3). –

+0

Кроме того, если требуется бесконечный счетчик (предполагая, что вы изначально не выбрали 'range'), см.' Itertools.count'. – Veedrac

+0

@ErikAllik Да, в этом точном случае было бы лучше. Я хотел бы указать, что это может быть любой конечный генератор, дающий данные для обработки. – bbc

ответ

2

Я предлагаю поставить значение дозорного положить на конец очереди

def put_tasks(q): 
    ... 

    print('put_tasks: no more tasks') 
    q.put(end_of_queue) 

def work(id, q): 
    while True: 
     task = q.get() 

     if task == end_of_queue: 
      q.put(task) 
      print("DONE") 
      return 

     print('process %d: %s' % (id, task)) 
     time.sleep(.1) 
    print('process %d: done' % id) 

class Sentinel: 
    def __init__(self, id): 
     self.id = id 

    def __eq__(self, other): 
     if isinstance(other, Sentinel): 
      return self.id == other.id 

     return NotImplemented 

if __name__ == '__main__': 
    q = Queue(2) 
    end_of_queue = Sentinel("end of queue") 
    task_gen = Process(target=put_tasks, args=(q,)) 
    processes = [Process(target=work, args=(id, q)) for id in range(0, 3)] 
    ... 

Я, кажется, не быть в состоянии использовать object() в качестве дозорных, поскольку потоки, кажется, имеют доступ к различным экземплярам , поэтому они не сравниваются равными.

Если вы когда-нибудь хотели, чтобы генерировать случайные сторожей вы можете использовать uuid модуль для генерации случайных идентификаторов:

import uuid 

class Sentinel: 
    def __init__(self): 
     self.id = uuid.uuid4() 

    def __eq__(self, other): 
     if isinstance(other, Sentinel): 
      return self.id == other.id 

     return NotImplemented 

Наконец, ПНЕТ используется None для часового, который вполне достаточно тех пор, пока очередь не может иметь None в. Метод дозорного будет работать в основном произвольных аргументах.

+0

Спасибо, он отлично смотрится, но я не думаю, что он обрабатывает мой случай с несколькими потребителями. – bbc

+0

Почему бы и нет? Меня устраивает. – Veedrac

+0

Я думаю, что первый потребитель будет потреблять * one * 'end_of_queue' рабочий, помещенный туда, а остальные ничего не получат. Поэтому вам нужно больше 'end_of_queue'. – Cucu

3

Самый простой способ заключается в добавлении в очередь то, что говорит потребителям все работа.

number_of_consumers = 3 

def put_tasks(q): 
    for c in counter(0, 9): 
     q.put(c) 
     time.sleep(.1) 
    print('put_tasks: no more tasks') 
    for i in range(number_of_consumers): 
     q.put(None) 

def work(id, q): 
    while True: 
     task = q.get() 
     if task is None: 
      break 
     print('process %d: %s' % (id, task)) 
     time.sleep(.3) 
    print('process %d: done' % id) 
0

Недавно я смотрел в тот же самый вопрос и нашел альтернативный ответ на вышеизложенное, в документации Python

Похоже, «правильный» способ сделать это с помощью метода Queue.task_done(), а именно:

def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
Смежные вопросы