Похоже, что есть более простое решение этой проблемы.
Вы строите список фьючерсов. Весь смысл фьючерсов в том, что они будущие результаты. В частности, независимо от того, что возвращает каждая функция, это (возможное) значение будущего. Итак, не делайте все «толкать результаты в очередь» вообще, просто возвращайте их из функции задачи и выбирайте их из фьючерсов.
Самый простой способ сделать это, чтобы разорвать этот цикл так, что каждая клавиша является отдельной задачей, с отдельным будущем. Я не знаю, что ли подходит для вашего реального кода, но если это:
def do_task(key):
try:
return perform_scan.delay(key)
except:
print "failed"
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(do_task, key) for key in key_list]
# If you want to do anything with these results, you probably want
# a loop around concurrent.futures.as_completed or similar here,
# rather than waiting for them all to finish, ignoring the results,
# and printing the number of them.
concurrent.futures.wait(futures)
print len(futures)
Конечно, это не делает группировку. Но вам это нужно?
Наиболее вероятной причиной необходимости группировки является то, что задачи настолько малы, что накладные расходы при планировании их (и травления входов и выходов) болотируют фактическую работу. Если это правда, вы можете почти наверняка подождать, пока не будет выполнена целая партия, чтобы возвращать какие-либо результаты. Особенно учитывая, что вы даже не смотрите на результаты, пока все не закончите. (Эта модель «разбита на группы, обрабатывает каждую группу, объединяется вместе», довольно распространена в таких случаях, как численная работа, где каждый элемент может быть крошечным, или элементы могут не быть независимыми друг от друга, но существуют группы, которые являются большими достаточно или независимы от остальной части работы.)
Во всяком случае, это почти так же просто:
def do_tasks(keys):
results = []
for key in keys:
try:
result = perform_scan.delay(key)
results.append(result)
except:
print "failed"
return results
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
print sum(len(results) for results in concurrent.futures.as_completed(futures))
Или, если вы предпочитаете первый ждать, а затем рассчитать:
def main():
executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
concurrent.futures.wait(futures)
print sum(len(future.result()) for future in futures)
Но опять же, я сомневаюсь, что вам нужно, даже это.
Если 'perform_scan.delay()' является асинхронным удаленным вызовом, который предположительно означает, что он не обрабатывает и просто ждет ответа, почему вы используете процессы вместо потоков в первую очередь? – abarnert