2012-05-15 3 views
10

Я пытаюсь найти максимальный вес около 6,1 миллиарда (пользовательских) предметов, и я хотел бы сделать это с параллельной обработкой. Для моего конкретного приложения есть лучшие алгоритмы, которые не требуют моего итерирования более 6,1 млрд. Предметов, но учебник, который их объясняет, над моей головой, и мой босс хочет, чтобы это было сделано за 4 дня. Я подумал, что у меня есть лучший шанс с фантазийным сервером моей компании и параллельной обработкой. Однако все, что я знаю о параллельной обработке, происходит от чтения Pythondocumentation. То есть я довольно потерян ...Избежать условий гонки в многопроцессорных очередях Python 3

Моя нынешняя теория - это настроить процесс подачи, очередь ввода, целую группу (скажем, 30) рабочих процессов и очередь вывода (найти максимальный элемент в очереди вывода будет тривиальным). То, что я не понимаю, - это то, как процесс фидера может сообщать рабочим процессам, когда нужно прекратить ждать, пока элементы попадут во входную очередь.

Я думал об использовании multiprocessing.Pool.map_async на моем истребителе предметов 6.1E9, но для прохождения итерации предметов требуется почти 10 минут, не делая ничего для них. Если я не понимаю что-то ..., имея map_async итерацию через них, чтобы назначить их процессам, можно было бы сделать, пока процессы начнут свою работу. (Pool также imap но documentation говорит, что это похоже на map, который не появляется на работе асинхронно Я хочу асинхронный, правый.?)

Похожие вопросы: Хочу ли я использовать concurrent.futures вместо multiprocessing ? Я не мог быть первым человеком, который мог бы реализовать систему с двумя очередями (именно так работают линии в каждом гастроном в Америке ...), так есть ли еще Pythonic/встроенный способ сделать это?

Вот скелет того, что я пытаюсь сделать. Смотрите блок комментариев посередине.

import multiprocessing as mp 
import queue 

def faucet(items, bathtub): 
    """Fill bathtub, a process-safe queue, with 6.1e9 items""" 
    for item in items: 
     bathtub.put(item) 
    bathtub.close() 

def drain_filter(bathtub, drain): 
    """Put maximal item from bathtub into drain. 
    Bathtub and drain are process-safe queues. 
    """ 
    max_weight = 0 
    max_item = None 
    while True: 
     try: 
      current_item = bathtub.get() 
     # The following line three lines are the ones that I can't 
     # quite figure out how to trigger without a race condition. 
     # What I would love is to trigger them AFTER faucet calls 
     # bathtub.close and the bathtub queue is empty. 
     except queue.Empty: 
      drain.put((max_weight, max_item)) 
      return 
     else: 
      bathtub.task_done() 
     if not item.is_relevant(): 
      continue 
     current_weight = item.weight 
     if current_weight > max_weight: 
      max_weight = current_weight 
      max_item = current_item 

def parallel_max(items, nprocs=30): 
    """The elements of items should have a method `is_relevant` 
    and an attribute `weight`. `items` itself is an immutable 
    iterator object. 
    """ 
    bathtub_q = mp.JoinableQueue() 
    drain_q = mp.Queue() 

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q)) 
    worker_procs = mp.Pool(processes=nprocs) 

    faucet_proc.start() 
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q) 

    finalists = [] 
    for i in range(nprocs): 
     finalists.append(drain_q.get()) 

    return max(finalists) 


ВОТ ОТВЕТ

Я нашел очень тщательный ответ на мой вопрос, и нежное введение в многозадачной из Python Foundation директор по коммуникациям Даг Hellman. То, что я хотел, было образцом «ядовитой таблетки». Посмотрите здесь: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

Подходит для @MRAB для публикации ядра этой концепции.

+0

Почему вы «импортируете очередь», если используете «multiprocessing.Queue»? –

+0

Я использую его, чтобы поймать исключение 'queue.Empty', когда рабочий смотрит на свою входную очередь. Мое задумчивое мышление заключалось в том, что это исключение можно было бы выбросить, если и только если очередь была закрыта, и она также была пустой. Обратите внимание, что при использовании метода @ MRAB в его ответе импорт «очереди» будет невозможен. – wkschwartz

ответ

3

Вы можете поместить в очередь специальный завершающий элемент, например None. Когда работник видит это, он может вернуть его другим работникам, чтобы увидеть, а затем прекратить работу. Кроме того, вы можете поместить в очередь один отдельный завершающий элемент для каждого работника.

+1

Это действительно хороший ответ. Я оставлю этот вопрос открытым дольше, хотя если кто-то может ответить на связанные вопросы. – wkschwartz

+0

Итак, я закрыл 'bathtub.put (None)' в конце 'faucet()' перед закрытием очереди, но что мне делать с вызовами 'task_done()'?Или я просто переключаюсь с использования 'JoinableQueue' на регулярную« очередь »и избавляюсь от всех вызовов' task_done() '? – wkschwartz

Смежные вопросы