2014-10-16 2 views
1

У меня есть сценарий питона, где в верхней части файла, у меня есть:Моя очередь пуста после экземпляров multiprocessing.Process закончить

result_queue = Queue.Queue() 
key_list = *a large list of small items* #(actually from bucket.list() via boto) 

я узнал, что Очереди процесса безопасных структуры данных. У меня есть метод:

def enqueue_tasks(keys): 
    for key in keys: 
     try: 
      result = perform_scan.delay(key) 
      result_queue.put(result) 
     except: 
      print "failed" 

perform_scan.delay() функция здесь на самом деле вызывает сельдерей рабочий, но я не думаю, что имеет отношения (это асинхронный вызов процесса).

У меня также есть:

def grouper(iterable, n, fillvalue=None): 
    args = [iter(iterable)] * n 
    return izip_longest(fillvalue=fillvalue, *args) 

Наконец у меня есть main() функцию:

def main(): 

    executor = concurrent.futures.ProcessPoolExecutor(10) 
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)] 
    concurrent.futures.wait(futures) 
    print len(result_queue) 

Результат от оператора печати 0. Тем не менее, если я включаю печать заявление о размере result_queue в enqueue_tasks, пока программа запущена, я вижу, что размер увеличивается, и вещи добавляются в очередь.

Идеи того, что происходит?

+1

Если 'perform_scan.delay()' является асинхронным удаленным вызовом, который предположительно означает, что он не обрабатывает и просто ждет ответа, почему вы используете процессы вместо потоков в первую очередь? – abarnert

ответ

1

Похоже, что есть более простое решение этой проблемы.

Вы строите список фьючерсов. Весь смысл фьючерсов в том, что они будущие результаты. В частности, независимо от того, что возвращает каждая функция, это (возможное) значение будущего. Итак, не делайте все «толкать результаты в очередь» вообще, просто возвращайте их из функции задачи и выбирайте их из фьючерсов.


Самый простой способ сделать это, чтобы разорвать этот цикл так, что каждая клавиша является отдельной задачей, с отдельным будущем. Я не знаю, что ли подходит для вашего реального кода, но если это:

def do_task(key): 
    try: 
     return perform_scan.delay(key) 
    except: 
     print "failed" 

def main(): 
    executor = concurrent.futures.ProcessPoolExecutor(10) 
    futures = [executor.submit(do_task, key) for key in key_list] 
    # If you want to do anything with these results, you probably want 
    # a loop around concurrent.futures.as_completed or similar here, 
    # rather than waiting for them all to finish, ignoring the results, 
    # and printing the number of them. 
    concurrent.futures.wait(futures) 
    print len(futures) 

Конечно, это не делает группировку. Но вам это нужно?

Наиболее вероятной причиной необходимости группировки является то, что задачи настолько малы, что накладные расходы при планировании их (и травления входов и выходов) болотируют фактическую работу. Если это правда, вы можете почти наверняка подождать, пока не будет выполнена целая партия, чтобы возвращать какие-либо результаты. Особенно учитывая, что вы даже не смотрите на результаты, пока все не закончите. (Эта модель «разбита на группы, обрабатывает каждую группу, объединяется вместе», довольно распространена в таких случаях, как численная работа, где каждый элемент может быть крошечным, или элементы могут не быть независимыми друг от друга, но существуют группы, которые являются большими достаточно или независимы от остальной части работы.)

Во всяком случае, это почти так же просто:

def do_tasks(keys): 
    results = [] 
    for key in keys: 
     try: 
      result = perform_scan.delay(key) 
      results.append(result) 
     except: 
      print "failed" 
    return results 

def main(): 
    executor = concurrent.futures.ProcessPoolExecutor(10) 
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)] 
    print sum(len(results) for results in concurrent.futures.as_completed(futures)) 

Или, если вы предпочитаете первый ждать, а затем рассчитать:

def main(): 
    executor = concurrent.futures.ProcessPoolExecutor(10) 
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)] 
    concurrent.futures.wait(futures) 
    print sum(len(future.result()) for future in futures) 

Но опять же, я сомневаюсь, что вам нужно, даже это.

+0

Мои глаза были открыты. Я даже не знаю, почему я не думал о фьючерсах как о простом понимании списка - это полностью побеждает цель моей очереди посредников, как вы заявляли (строя другой комментарий к вашему комментарию). Я также выбрал группировку - я думал о том, чтобы группировать неправильный путь. Кажется, мне это не нужно, так как хотя мои индивидуальные задачи очень малы, я не смотрю ни на кого из них до конца. Одна вещь, которую я не понимаю, - «future.result». Я думал, что будущее является фактическим результатом, является ли будущее его собственным объектом? – jeffrey

+1

@jeffrey: Да, ['Future'] (https://docs.python.org/3/library/concurrent.futures.html#future-objects) - это объект, который содержит результат, который может быть недоступен еще , Можно создать модель параллелизма целого языка вокруг неявных фьючерсов (см. AliceML или любой язык актера или потока данных), что может быть очень круто, но это не вписывается в Python, поэтому вместо этого есть явные фьючерсы. См. [Википедия] (http://en.wikipedia.org/wiki/Futures_and_promises) для дальнейшего обсуждения. – abarnert

2

Вам необходимо использовать multiprocessing.Queue, а не Queue.Queue. Queue.Queue is поточно-безопасный, не является безопасным процессом, поэтому изменения, которые вы вносите в него в один процесс, не отражаются ни в каких других.

+0

Также обратите внимание, что я упомянул об этом в [комментарии] (http://stackoverflow.com/questions/26409865/can-i-have-two-multithreaded-functions-running-at-the-same-time#comment41469218_26410020) в ваш предыдущий вопрос по этой теме. Прошу прощения, если не ясно, что я имел в виду. – dano

+0

Я ценю помощь! Невозможно было понять, что я не дал контекста моей проблемы. – jeffrey