2016-11-29 5 views
8

Итак, у меня есть система с производителем, а потребитель подключен к очереди неограниченного размера, но если потребитель неоднократно набирает до тех пор, пока не будет сброшено пустое исключение, оно не очистит очередь ,Изменение размера буфера при многопроцессорной обработке.

Я считаю, что это связано с тем, что поток в очереди на стороне потребителя, который сериализует объекты в сокет, блокируется после заполнения буфера сокета, и поэтому он ждет, пока буфер не будет иметь место, однако это возможно, для потребителя вызвать «слишком быстро», и поэтому он считает, что очередь пуста, когда на самом деле поток с другой стороны имеет гораздо больше данных для отправки, но просто не может сериализовать его достаточно быстро, чтобы предотвратить появление сокета, ,

Я считаю, что эта проблема была бы облегчена, если бы я мог изменить размер буфера в базовом сокете (я основан на Windows). Насколько я могу видеть то, что мне нужно сделать, это что-то вроде:

import multiprocessing.connections as conns 
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows 
import multiprocessing.Queue as q 

Если я выше, то это значит, что когда multirprocssing инициализирует очередь он будет использовать новый размер буфера, который я поставил в версию многопроцессорных соединений, которые я уже импортировал? Это верно?

Также я полагаю, что это повлияет только на окна, поскольку BUFSIZE не используется на Linux-машинах, поскольку по умолчанию их все сокеты установлены на 60 килобайт?

Кто-нибудь пробовал это раньше? Будет ли это иметь побочные эффекты на окнах? И каковы основные ограничения на размеры буфера сокетов на окнах?

=================== пример кода, чтобы продемонстрировать ===================

# import multiprocessing.connection as conn 
# conn.BUFSIZE = 2 ** 19 
import sys 
import multiprocessing as mp 
from Queue import Empty 
from time import sleep 

total_length = 10**8 

def supplier(q): 
    print "Starting feeder" 
    for i in range(total_length) : 
     q.put(i) 


if __name__=="__main__": 

    queue = mp.Queue() 

    p = mp.Process(target=supplier, args=(queue,)) 

    p.start() 

    sleep(120) 

    returned = [] 
    while True : 
     try : 
      returned.append(queue.get(block=False)) 
     except Empty : 
      break 

    print len(returned) 
    print len(returned) == total_length 

    p.terminate() 
    sys.exit() 

Этот образец, когда он запускается в окнах, обычно вытягивает около 160 000 предметов из очереди, потому что основной поток может освобождать буфер быстрее, чем он заправляется поставщиком, и в конечном итоге он пытается вытащить из очереди, когда буфер пуст и сообщает, что он пуст.

Вы можете теоретически улучшить эту проблему, имея больший размер буфера. Две строки наверху, я полагаю, в системе Windows увеличивают размер буфера по умолчанию для канала.

Если вы прокомментируете их, тогда этот скрипт вытащит больше данных, прежде чем он выйдет, так как он намного выше. Мои основные вопросы: 1) Это действительно работает. 2) Есть ли способ сделать этот код одним и тем же размером базового буфера в окнах и linux 3) Есть ли неожиданные побочные эффекты от установки больших размеров буфера для труб.

Я знаю, что в целом нет способа узнать, вы ли вытащили все данные из очереди (- при условии, что поставщик работает постоянно и производит данные очень неравномерно), но я ищу способы улучшить это на основе наилучших усилий.

+1

Я думаю, что небольшой пример, демонстрирующий эту проблему, был бы полезен. – pradyunsg

ответ

6

Update:

полезная ссылка ОКОН трубы для людей, которые нуждаются в ней в будущем (ссылка предоставлена ​​ОП, phil_20686): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

Origianl:

BUFSIZE работает только тогда, когда платформа является win32.

multiprocessing.Queue построен на вершине трубы, если вы измените BUFSIZE, то созданная вами Queue будет использовать обновленное значение. смотри ниже:

class Queue(object): 

    def __init__(self, maxsize=0): 
     if maxsize <= 0: 
      maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 
     self._maxsize = maxsize 
     self._reader, self._writer = Pipe(duplex=False) 

Когда платформа win32, код трубы будет вызывать следующий код:

def Pipe(duplex=True): 
    ''' 
    Returns pair of connection objects at either end of a pipe 
    ''' 
    address = arbitrary_address('AF_PIPE') 
    if duplex: 
     openmode = win32.PIPE_ACCESS_DUPLEX 
     access = win32.GENERIC_READ | win32.GENERIC_WRITE 
     obsize, ibsize = BUFSIZE, BUFSIZE 
    else: 
     openmode = win32.PIPE_ACCESS_INBOUND 
     access = win32.GENERIC_WRITE 
     obsize, ibsize = 0, BUFSIZE 

    h1 = win32.CreateNamedPipe(
     address, openmode, 
     win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 
     win32.PIPE_WAIT, 
     1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL 
     ) 

Вы можете видеть, что когда duplex ложна, outbuffer размер 0 и InBuffer размер BUFSIZE.

inbuffer - это количество байтов для резервирования входного буфера. 2 ** 16 = 65536, максимальная сумма байтов может быть записана за одну операцию без блокировки, но емкость размера буфера изменяется в разных системах, она меняется даже в одной и той же системе, поэтому трудно сказать, что сторона эффект, когда вы устанавливаете максимальную величину трубы.

+0

Существует несколько важных/хороших сведений о поведении окон в разделе примечаний: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx Если вы добавите ссылку/резюме, я буду принимать это как ответ. –

+0

@ phil_20686 Я обновил свой ответ и разместил вашу предоставленную ссылку. – haifzhan

+0

Приняли это как ответ, но этикетка переполнения стека FYI всегда заключается в суммировании содержимого ссылки, чтобы избежать гниения ссылок, если веб-сайт перемещается и т. Д. Через несколько лет. благодаря –

Смежные вопросы