Я думаю, что лучшим вариантом является использование двух бассейнов здесь:
from multiprocessing import Pool
# import parsers here
parsers = {
'parser1': parser1.process,
'parser2': parser2.process,
'parser3': parser3.process,
'parser4': parser4.process,
'parser5': parser5.process,
'parser6': parser6.process,
'parser7': parser7.process,
}
# Sets that define which items can use high parallelism,
# and which must use low
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"}
low_par = {"parser2", "parser5"}
def process_items(key, value):
parsers[key](value)
def run_pool(func, items, num_items, check_set):
pool = Pool(num_items)
out = pool.map(func, (item for item in items if item[0] in check_set))
pool.close()
pool.join()
return out
if __name__ == "__main__":
items = [('parser2', x), ...] # Your list of tuples
# Process with high parallelism
high_results = run_pool(process_items, items, 4, high_par)
# Process with low parallelism
low_results = run_pool(process_items, items, 2, low_par)
Попытка сделать это в одном Pool
возможно, благодаря умному использованию примитивов синхронизации, но я не думаю, что это будет выглядеть намного чище, чем это. Это также может закончиться менее эффективно, так как иногда вашему пулу придется ждать, пока работа закончится, поэтому он может обрабатывать элемент с низким параллелизмом, даже если в очереди находятся элементы с высоким параллелизмом.
Это усложняться немного, если вам нужно, чтобы получить результаты от каждого process_items
вызова в том же порядке, как они упали в первоначальном Iterable, то есть результаты от каждого Pool
нужно сольются, но, основываясь на вашем примере I не думайте, что это требование. Дайте мне знать, если это так, и я постараюсь соответствующим образом настроить свой ответ.
Не могли бы вы просто использовать два бассейна? Во-первых, создайте пул, который использует все ваши ядра, перебирает список, отфильтровывает записи, которые требуют уменьшенного параллелизма, и называет «pool1.map» остальными элементами. Затем закройте/присоединитесь к этому пулу. Затем создайте новый пул с меньшим количеством процессов и вызовите 'map' только для записей в итерабеле, которые * do * нуждаются в уменьшенном параллелизме. – dano
Это был единственный вариант, о котором я мог думать, я надеялся, что может быть более чистый путь? Или, может быть, это достаточно чисто? –
Другие варианты, о которых я могу думать, вероятно, * менее * чистые - вам понадобится какая-то синхронизация для всех ваших работников, чтобы обрабатывать ее всего одним пулом. И вам также нужно иметь дело с случаями, когда некоторые работники работают, когда вы попадаете на предметы, которые нуждаются в уменьшенном параллелизме, а это означает, что вам нужно будет подождать, пока другие работники не будут обработаны. Похоже, что это закончится тем, что станет беспорядочным, чтобы поправиться. – dano