2015-11-18 4 views
1

Я использую класс Python multiprocessing.JoinableQueue, и я пытаюсь наложить ограничение размера на очередь. Если очередь заполнена до этого предела, цикл будет спать и попытаться повторно добавить задачу, когда пространство в очереди освободится, но я не могу найти надежный способ отслеживания размера очереди.Определить, сколько элементов находится в Python JoinableQueue

Я думал об использовании какой-то логики, как это, только чтобы выяснить функцию .qsize() я ожидал от Queue модуля не существует:

from multiprocessing import JoinableQueue 
QUEUE_SIZE = 50 
QUEUE_WAIT = 900 
task_queue = JoinableQueue(QUEUE_SIZE) 
.... 
if QUEUE_SIZE is not 0: 
    # if QUEUE_SIZE is zero, there is no limit on the queue 
    while True: 
     # if the size of the queue equals our self-imposed limit, wait to try and add this task 
     if task_queue.qsize() == QUEUE_SIZE: 
      print 'task queue limit is met. task will be added when space clears' 
      time.sleep(QUEUE_WAIT) 
     else: 
      # add the task if we can 
      self.task_queue.put(path) 
      print 'task queued" task="%s"' % path) 
      break 

    else: 
     # if there's no limit just add the file_path 
     self.task_queue.put(file_path) 

Есть ли предпочтительный способ отследить, сколько пунктов в настоящее время в JoinableQueue или, возможно, лучший способ повторить попытку добавления элементов в очередь, если они не могут быть добавлены сразу? Может быть, всего лишь try/except/sleep внутри петли? Однако это не лучший вариант.

Любой бы очень ценится :)

+0

«try/except/sleep» в цикле, вероятно, самый подходящий способ сделать это (хотя я должен признать, что раньше я не использовал JoinableQueue). Однако отслеживание размера чего-то, к которому обращаются из нескольких потоков вне любой блокировки, которую выполняет JoinableQueue, кажется немного раздражительным. Вам все равно придется обрабатывать случай «except», так как могут быть другие причины, по которым очередь не примет ваш элемент. –

ответ

1

JoinableQueue должен иметь .full() метод, который вы должны быть в состоянии использовать, чтобы определить, имеет ли очередь место для новых элементов. Использование full() вместо qsize() означает, что вы можете избежать необходимости отслеживать максимальный размер очереди отдельно.

Однако я бы не стал использовать это, так как он будет ненадежным так же, как .qsize(). Очередь может быть средней модификацией при ее чтении, поэтому вам придется обрабатывать случай исключения в любом случае. Использование try....except внутри цикла со сном, вероятно, является самым ясным, безопасным и наиболее практичным способом достижения того, что вы хотите попробовать.

Обертывание это в вспомогательной функции может сделать код проще (вы должны изменить это, чтобы обрабатывать аргументы func или иметь обернуть вызов без аргументов лямбда перед передачей его try_until().

def try_until(func, max_tries, sleep_time): 
    for _ in range(0,max_tries): 
     try: 
      return func() 
     except: 
      sleep(sleep_time) 
    raise WellNamedException() 
+0

Perf ect, спасибо! Я знаю, что '.qsize()', к сожалению, немного непредсказуем. Я также вхожу в поставленные в очередь элементы (которые являются файловыми путями) в коллекцию MongoDB, чтобы избежать повторного добавления их, когда каталог проверен снова, поэтому, возможно, я мог бы отслеживать количество пунктов в очереди, проверяя коллекцию Mongo, используя это обертка? – deadbits

Смежные вопросы