2015-02-05 4 views
3

Я хочу иметь одного производителя, многопользовательскую архитектуру в Python при выполнении многопоточного программирования. Я хотел бы иметь операцию так:Single Producer Multiple Consumer

  1. Производитель производит данные
  2. Потребители 1 ..n (N предопределено) ждать данных, чтобы прибыть (блок), а затем обработать те же данные в различные пути.

Поэтому мне нужны все потребители, чтобы получить те же данные от производителя.

Когда я использовал Queue для выполнения этого, я понял, что все, кроме первого потребителя, будут голодать с реализацией, которую я имею.

Одним из возможных решений является наличие уникальной очереди для каждого из потребительских потоков, при котором одни и те же данные передаются в несколько очередей производителем. Есть лучший способ сделать это ?

from threading import Thread 
import time 
import random 
from Queue import Queue 

my_queue = Queue(0) 

def Producer(): 
    global my_queue 
    my_list = [] 
    for each in range (50): 
     my_list.append(each) 
    my_queue.put(my_list) 

def Consumer1(): 
    print "Consumer1" 
    global my_queue 
    print my_queue.get() 
    my_queue.task_done() 

def Consumer2(): 
    print "Consumer2" 
    global my_queue 
    print my_queue.get() 
    my_queue.task_done() 


P = Thread(name = "Producer", target = Producer) 

C1 = Thread(name = "Consumer1", target = Consumer1) 

C2 = Thread(name = "Consumer2", target = Consumer2) 


P.start() 

C1.start() 

C2.start() 

В приведенном выше примере C2 блокируется на неопределенный срок, поскольку C1 потребляет данные, полученные с помощью P1. Я бы предпочел, чтобы для C1 и C2 оба были доступны для получения ТОЛЬКО данных, созданных P1.

Спасибо за любой код/​​указатели!

ответ

1

Ваш производитель создает только один работу делать:

my_queue.put(my_list) 

Например, положить my_list дважды, и оба потребители работают:

def Producer(): 
    global my_queue 
    my_list = [] 
    for each in range (50): 
     my_list.append(each) 
    my_queue.put(my_list) 
    my_queue.put(my_list) 

Так этот путь вы положили два рабочих места в очередь, с тот же список.

Однако я должен предупредить вас: изменить одни и те же данные в разных потоках без синхронизации потоков, как правило, плохая идея.

В любом случае подход с одной очередью не будет работать для вас, так как одна очередь должна обрабатываться потоками с тем же алгоритмом.

Итак, я советую вам выбрать уникальную очередь для каждого потребителя, так как другие решения не так тривиальны.

+0

Что помешало бы одному и тому же потребителю получить обе копии данных очереди? – martineau

+0

@martineau спасибо, что указал. Ничего в действительности. Обновленный ответ. –

+0

Хм даже с вашим обновленным ответом я все еще не понимаю, как вы можете справиться с проблемой @martineau? Вы имели в виду, что есть некоторые нетривиальные решения? Не могли бы вы упомянуть их? – max

1

Как насчет очереди в потоке?

В качестве части запуска каждого потребителя вы также создадите очередную очередь и добавьте это в список «все очереди потоков». Затем запустите продюсер, передав ему список всех очередей, которые он может затем передать данные во всех них.

+0

OP сказал: «Одно из возможных решений заключается в том, чтобы иметь уникальную очередь для каждого из потребительских потоков, где одни и те же данные вносятся в несколько очередей производителем. Есть ли лучший способ сделать это?» - поэтому, похоже, ищет другой подход. – martineau

+0

@martineau Lol, полностью пропустил это. –

Смежные вопросы