Вот пример. У меня есть один производитель и несколько потребителей.Сообщите потребителям прекратить ожидание элементов очереди
#!/usr/bin/env python2
from multiprocessing import Process, Queue
import time
def counter(low, high):
current = low
while current <= high:
yield current
current += 1
def put_tasks(q):
for c in counter(0, 9):
q.put(c)
time.sleep(.1)
print('put_tasks: no more tasks')
def work(id, q):
while True:
task = q.get()
print('process %d: %s' % (id, task))
time.sleep(.3)
print('process %d: done' % id)
if __name__ == '__main__':
q = Queue(2)
task_gen = Process(target=put_tasks, args=(q,))
processes = [Process(target=work, args=(id, q)) for id in range(0, 3)]
task_gen.start()
for p in processes:
p.start()
for p in processes:
p.join()
counter
просто номер генератора для put_tasks
. Как правило, у меня было бы несколько тысяч задач вместо 10, как в этом примере. Точка этого кода состоит в том, чтобы поэтапно кормить очередь задачами.
Проблема заключается в том, что потребители не могут заранее знать, сколько задач им придется обрабатывать, но функция put_tasks
знает, когда это будет сделано (тогда она печатает no more tasks
).
Пример вывода:.
process 2: 0
process 0: 1
process 1: 2
process 2: 3
process 0: 4
process 1: 5
process 2: 6
process 0: 7
process 1: 8
process 2: 9
put_tasks: no more tasks
Все задачи обрабатываются, но программа затем виснет (каждый процесс застревает на q.get()
Я хотел бы, чтобы это прекратить, когда все задачи были обработаны без ущерба для скорости и безопасности (нет некрасивых таймауты).
Любые идеи?
Вы должны заменить 'counter()' на 'xrange()' (или 'range()', если вы находитесь на Python 3). –
Кроме того, если требуется бесконечный счетчик (предполагая, что вы изначально не выбрали 'range'), см.' Itertools.count'. – Veedrac
@ErikAllik Да, в этом точном случае было бы лучше. Я хотел бы указать, что это может быть любой конечный генератор, дающий данные для обработки. – bbc