Я пытаюсь понять, как я могу обрабатывать ориентированный ациклический граф параллельно. Каждый узел должен только «выполнять», когда все его входные узлы были обработаны заранее. Представьте себе класс Task
со следующим интерфейсом:параллельная обработка DAG
class Task(object):
result = None
def inputs(self):
''' List all requirements of the task. '''
return()
def run(self):
pass
Я не могу придумать способ, чтобы обработать график, который может быть представлен по этой структуре асинхронно с максимальным числом рабочих на то же время, за исключением один способ.
I думаю оптимальная обработка будет достигнута путем создания потока для каждой задачи, ожидающей обработки всех входов. Но, порождая поток для каждой задачи немедленно, а не последовательно (т. Е. Когда задача готова к обработке) для меня не кажется хорошей идеей.
import threading
class Runner(threading.Thread):
def __init__(self, task):
super(Runner, self).__init__()
self.task = task
self.start()
def run(self):
threads = [Runner(r) for r in self.task.inputs()]
[t.join() for t in threads]
self.task.run()
Есть ли способ лучше подражать этому поведению? Кроме того, этот подход в настоящее время не реализует способ ограничения количества выполняемых задач на этапе .