Приложение представляет собой потоковый разделитель - процесс-производитель получает поток данных и несколько процессов-клиентов (клиентские соединения) отправляет (передает) полученные данные подключенному клиенту.Передача переменной от процесса производителя ко всем потребительским процессам
Я нашел это sample code для Condition Variable
(он использует multithreading
, но он должен работать на multiprocessing
также) и реструктурировать его, чтобы он не всплывал элемент в процессе потребителя. Вот как я ожидал, что другой потребительский процесс сможет повторно использовать его и повторно отправить те же данные подключенным клиентам. Как только все потребительские процессы завершат отправку, я удаляю item[0]
из массива буфера. Но это не работает, поскольку процессы не выполняются в предсказуемом порядке.
1. Receive new data - Producer process
2. Send received data - Consumer process [1]
3. Send received data - Consumer process [2]
...
n. Send received data - Consumer process [n]
Loop everything.
Обычно бывает, что процесс производителя удаляет item[0]
, прежде чем все Потребительские процессы получения для получения item[0]
и отправить его.
Я думаю, что одним из возможных решений было бы использовать отдельный Queue()
для каждого потребительского процесса и в процессе производства для заполнения этих отдельных очередей.
Можно ли использовать Event()
, чтобы уведомить потребительский процесс о прибытии новых данных и передать эти данные независимо от других потребительских процессов без очереди?
Если использование очереди является лучшим решением, можно использовать только одну очередь и сохранять новые данные до тех пор, пока все потребительские процессы не отправят ее?
Я открыт для любых предложений, так как я не уверен, что это лучший способ сделать это.
import threading
import time
# A list of items that are being produced. Note: it is actually
# more efficient to use a collections.deque() object for this.
items = []
# A condition variable for items
items_cv = threading.Condition()
# A producer thread
def producer():
print "I'm the producer"
for i in range(30):
with items_cv: # Always must acquire the lock first
items.append(i) # Add an item to the list
items_cv.notify() # Send a notification signal
time.sleep(1)
items.pop(0) # Pop item remove it from the "buffer"
# A consumer thread
def consumer():
print "I'm a consumer", threading.currentThread().name
while True:
with items_cv: # Must always acquire the lock
while not items: # Check if there are any items
items_cv.wait() # If not, we have to sleep
# x = items.pop(0) # Pop an item off
x = items[0]
print threading.currentThread().name,"got", x
time.sleep(5)
# Launch a bunch of consumers
cons = [threading.Thread(target=consumer)
for i in range(10)]
for c in cons:
c.setDaemon(True)
c.start()
# Run the producer
producer()
Просто быстрое наблюдение. Если я правильно вас понимаю, вы хотите, чтобы все потребители закончили на позиции 0, прежде чем кто-либо из них начнет с пункта 1, что сделает любого потребителя способным заблокировать все остальные. То есть в моем опыте что-то обычно хочет избежать. Отдельная очередь для каждого потребителя позволит избежать этой проблемы. –
Что вы подразумеваете под словом «После того, как все потребительские процессы завершатся, я удаляю элемент [0] из массива буфера». Похоже, что потребитель ничего не посылает; просто напечатайте номер/элемент – Pandrei
@ViktorFougstedt. Вы правы ... Я хочу заставить всех потребителей закончить отправку 'item [0]', прежде чем кто-либо из них начнет отправлять 'item [1]'. В идеальном случае мне никогда не придется использовать 'item [1]', поскольку после того, как все потребители заканчивают 'item [0]', я просто удалю его и добавлю новое (только что полученное) значение в массив элементов. @Pandrei Это просто пример из данной ссылки. Вместо этого в процессе Потребителя у меня есть клиент, через который я отправляю 'item [0]'. – sstevan