Я пытаюсь найти максимальный вес около 6,1 миллиарда (пользовательских) предметов, и я хотел бы сделать это с параллельной обработкой. Для моего конкретного приложения есть лучшие алгоритмы, которые не требуют моего итерирования более 6,1 млрд. Предметов, но учебник, который их объясняет, над моей головой, и мой босс хочет, чтобы это было сделано за 4 дня. Я подумал, что у меня есть лучший шанс с фантазийным сервером моей компании и параллельной обработкой. Однако все, что я знаю о параллельной обработке, происходит от чтения Pythondocumentation. То есть я довольно потерян ...Избежать условий гонки в многопроцессорных очередях Python 3
Моя нынешняя теория - это настроить процесс подачи, очередь ввода, целую группу (скажем, 30) рабочих процессов и очередь вывода (найти максимальный элемент в очереди вывода будет тривиальным). То, что я не понимаю, - это то, как процесс фидера может сообщать рабочим процессам, когда нужно прекратить ждать, пока элементы попадут во входную очередь.
Я думал об использовании multiprocessing.Pool.map_async
на моем истребителе предметов 6.1E9, но для прохождения итерации предметов требуется почти 10 минут, не делая ничего для них. Если я не понимаю что-то ..., имея map_async
итерацию через них, чтобы назначить их процессам, можно было бы сделать, пока процессы начнут свою работу. (Pool
также imap
но documentation говорит, что это похоже на map
, который не появляется на работе асинхронно Я хочу асинхронный, правый.?)
Похожие вопросы: Хочу ли я использовать concurrent.futures
вместо multiprocessing
? Я не мог быть первым человеком, который мог бы реализовать систему с двумя очередями (именно так работают линии в каждом гастроном в Америке ...), так есть ли еще Pythonic/встроенный способ сделать это?
Вот скелет того, что я пытаюсь сделать. Смотрите блок комментариев посередине.
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
ВОТ ОТВЕТ
Я нашел очень тщательный ответ на мой вопрос, и нежное введение в многозадачной из Python Foundation директор по коммуникациям Даг Hellman. То, что я хотел, было образцом «ядовитой таблетки». Посмотрите здесь: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
Подходит для @MRAB для публикации ядра этой концепции.
Почему вы «импортируете очередь», если используете «multiprocessing.Queue»? –
Я использую его, чтобы поймать исключение 'queue.Empty', когда рабочий смотрит на свою входную очередь. Мое задумчивое мышление заключалось в том, что это исключение можно было бы выбросить, если и только если очередь была закрыта, и она также была пустой. Обратите внимание, что при использовании метода @ MRAB в его ответе импорт «очереди» будет невозможен. – wkschwartz