2015-12-11 2 views
0

Приложение представляет собой потоковый разделитель - процесс-производитель получает поток данных и несколько процессов-клиентов (клиентские соединения) отправляет (передает) полученные данные подключенному клиенту.Передача переменной от процесса производителя ко всем потребительским процессам

Я нашел это sample code для Condition Variable (он использует multithreading, но он должен работать на multiprocessing также) и реструктурировать его, чтобы он не всплывал элемент в процессе потребителя. Вот как я ожидал, что другой потребительский процесс сможет повторно использовать его и повторно отправить те же данные подключенным клиентам. Как только все потребительские процессы завершат отправку, я удаляю item[0] из массива буфера. Но это не работает, поскольку процессы не выполняются в предсказуемом порядке.

1. Receive new data - Producer process 
2. Send received data - Consumer process [1] 
3. Send received data - Consumer process [2] 
... 
n. Send received data - Consumer process [n] 
Loop everything. 

Обычно бывает, что процесс производителя удаляет item[0], прежде чем все Потребительские процессы получения для получения item[0] и отправить его.

Я думаю, что одним из возможных решений было бы использовать отдельный Queue() для каждого потребительского процесса и в процессе производства для заполнения этих отдельных очередей.

Можно ли использовать Event(), чтобы уведомить потребительский процесс о прибытии новых данных и передать эти данные независимо от других потребительских процессов без очереди?

Если использование очереди является лучшим решением, можно использовать только одну очередь и сохранять новые данные до тех пор, пока все потребительские процессы не отправят ее?

Я открыт для любых предложений, так как я не уверен, что это лучший способ сделать это.

import threading 
import time 

# A list of items that are being produced. Note: it is actually 
# more efficient to use a collections.deque() object for this. 

items = [] 

# A condition variable for items 
items_cv = threading.Condition() 

# A producer thread 
def producer(): 
    print "I'm the producer" 
    for i in range(30): 
     with items_cv:   # Always must acquire the lock first 
      items.append(i)  # Add an item to the list 
      items_cv.notify() # Send a notification signal 
     time.sleep(1) 
     items.pop(0)   # Pop item remove it from the "buffer" 


# A consumer thread 
def consumer(): 
    print "I'm a consumer", threading.currentThread().name 
    while True: 
     with items_cv:   # Must always acquire the lock 
      while not items:  # Check if there are any items 
       items_cv.wait() # If not, we have to sleep 
      # x = items.pop(0)  # Pop an item off 
      x = items[0] 
     print threading.currentThread().name,"got", x 
     time.sleep(5) 


# Launch a bunch of consumers 
cons = [threading.Thread(target=consumer) 
     for i in range(10)] 

for c in cons: 
    c.setDaemon(True) 
    c.start() 

# Run the producer 
producer() 
+0

Просто быстрое наблюдение. Если я правильно вас понимаю, вы хотите, чтобы все потребители закончили на позиции 0, прежде чем кто-либо из них начнет с пункта 1, что сделает любого потребителя способным заблокировать все остальные. То есть в моем опыте что-то обычно хочет избежать. Отдельная очередь для каждого потребителя позволит избежать этой проблемы. –

+0

Что вы подразумеваете под словом «После того, как все потребительские процессы завершатся, я удаляю элемент [0] из массива буфера». Похоже, что потребитель ничего не посылает; просто напечатайте номер/элемент – Pandrei

+0

@ViktorFougstedt. Вы правы ... Я хочу заставить всех потребителей закончить отправку 'item [0]', прежде чем кто-либо из них начнет отправлять 'item [1]'. В идеальном случае мне никогда не придется использовать 'item [1]', поскольку после того, как все потребители заканчивают 'item [0]', я просто удалю его и добавлю новое (только что полученное) значение в массив элементов. @Pandrei Это просто пример из данной ссылки. Вместо этого в процессе Потребителя у меня есть клиент, через который я отправляю 'item [0]'. – sstevan

ответ

0

Самый простой способ решить эту проблему, чтобы иметь Queue на каждого клиента. В моей функции-приемнике (слушателе) у меня есть этот мир кода, который создает буфер/очередь для каждого входящего соединения.

 buffer = multiprocessing.Queue() 
     self.client_buffers.append(buffer) 
     process = multiprocessing.Process(name=procces_name, 
              target=self.stream_handler, 
              args(conn, address, buffer)) 
     process.daemon = True 
     process.start() 

В основной (продюсерской) цепочке каждая очередь заполняется, как только появятся новые данные.

while True: 
    data = sock.recv(2048) 
    if not data: break 
    for buffer in self.client_buffers: 
     buffer.put(data) 

И каждый процесс потребитель отправляет данные независимо

if not buffer.empty(): 
    connection.sendall(buffer.get())