Итак, у меня есть система с производителем, а потребитель подключен к очереди неограниченного размера, но если потребитель неоднократно набирает до тех пор, пока не будет сброшено пустое исключение, оно не очистит очередь ,Изменение размера буфера при многопроцессорной обработке.
Я считаю, что это связано с тем, что поток в очереди на стороне потребителя, который сериализует объекты в сокет, блокируется после заполнения буфера сокета, и поэтому он ждет, пока буфер не будет иметь место, однако это возможно, для потребителя вызвать «слишком быстро», и поэтому он считает, что очередь пуста, когда на самом деле поток с другой стороны имеет гораздо больше данных для отправки, но просто не может сериализовать его достаточно быстро, чтобы предотвратить появление сокета, ,
Я считаю, что эта проблема была бы облегчена, если бы я мог изменить размер буфера в базовом сокете (я основан на Windows). Насколько я могу видеть то, что мне нужно сделать, это что-то вроде:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q
Если я выше, то это значит, что когда multirprocssing инициализирует очередь он будет использовать новый размер буфера, который я поставил в версию многопроцессорных соединений, которые я уже импортировал? Это верно?
Также я полагаю, что это повлияет только на окна, поскольку BUFSIZE не используется на Linux-машинах, поскольку по умолчанию их все сокеты установлены на 60 килобайт?
Кто-нибудь пробовал это раньше? Будет ли это иметь побочные эффекты на окнах? И каковы основные ограничения на размеры буфера сокетов на окнах?
=================== пример кода, чтобы продемонстрировать ===================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep
total_length = 10**8
def supplier(q):
print "Starting feeder"
for i in range(total_length) :
q.put(i)
if __name__=="__main__":
queue = mp.Queue()
p = mp.Process(target=supplier, args=(queue,))
p.start()
sleep(120)
returned = []
while True :
try :
returned.append(queue.get(block=False))
except Empty :
break
print len(returned)
print len(returned) == total_length
p.terminate()
sys.exit()
Этот образец, когда он запускается в окнах, обычно вытягивает около 160 000 предметов из очереди, потому что основной поток может освобождать буфер быстрее, чем он заправляется поставщиком, и в конечном итоге он пытается вытащить из очереди, когда буфер пуст и сообщает, что он пуст.
Вы можете теоретически улучшить эту проблему, имея больший размер буфера. Две строки наверху, я полагаю, в системе Windows увеличивают размер буфера по умолчанию для канала.
Если вы прокомментируете их, тогда этот скрипт вытащит больше данных, прежде чем он выйдет, так как он намного выше. Мои основные вопросы: 1) Это действительно работает. 2) Есть ли способ сделать этот код одним и тем же размером базового буфера в окнах и linux 3) Есть ли неожиданные побочные эффекты от установки больших размеров буфера для труб.
Я знаю, что в целом нет способа узнать, вы ли вытащили все данные из очереди (- при условии, что поставщик работает постоянно и производит данные очень неравномерно), но я ищу способы улучшить это на основе наилучших усилий.
Я думаю, что небольшой пример, демонстрирующий эту проблему, был бы полезен. – pradyunsg