2015-04-21 2 views
6

У меня есть скрипт python, который запускает метод параллельно.Как изменить количество параллельных процессов?

parsers = { 
    'parser1': parser1.process, 
    'parser2': parser2.process 
} 

def process((key, value)): 
    parsers[key](value) 

pool = Pool(4) 
pool.map(process_items, items) 

process_items мой метод и items список кортежей с двумя элементами для каждого кортежа. Список items имеет около 100 тыс. Единиц.

process_items затем вызовет метод в зависимости от параметров. Моя проблема в том, что, возможно, 70% списка, которое я могу запустить с высоким параллелизмом, но остальные 30% могут работать только с 1/2 потоками, иначе это приведет к сбою вне моего контроля.

Так что в моем коде у меня около 10 различных процессов парсера. Например, 1-8 я хочу работать с бассейном (4), но 9-10 пула (2).

Что такое лучший способ оптимизировать это?

+0

Не могли бы вы просто использовать два бассейна? Во-первых, создайте пул, который использует все ваши ядра, перебирает список, отфильтровывает записи, которые требуют уменьшенного параллелизма, и называет «pool1.map» остальными элементами. Затем закройте/присоединитесь к этому пулу. Затем создайте новый пул с меньшим количеством процессов и вызовите 'map' только для записей в итерабеле, которые * do * нуждаются в уменьшенном параллелизме. – dano

+0

Это был единственный вариант, о котором я мог думать, я надеялся, что может быть более чистый путь? Или, может быть, это достаточно чисто? –

+0

Другие варианты, о которых я могу думать, вероятно, * менее * чистые - вам понадобится какая-то синхронизация для всех ваших работников, чтобы обрабатывать ее всего одним пулом. И вам также нужно иметь дело с случаями, когда некоторые работники работают, когда вы попадаете на предметы, которые нуждаются в уменьшенном параллелизме, а это означает, что вам нужно будет подождать, пока другие работники не будут обработаны. Похоже, что это закончится тем, что станет беспорядочным, чтобы поправиться. – dano

ответ

2

Я думаю, что лучшим вариантом является использование двух бассейнов здесь:

from multiprocessing import Pool 
# import parsers here 

parsers = { 
    'parser1': parser1.process, 
    'parser2': parser2.process, 
    'parser3': parser3.process, 
    'parser4': parser4.process, 
    'parser5': parser5.process, 
    'parser6': parser6.process, 
    'parser7': parser7.process, 
} 

# Sets that define which items can use high parallelism, 
# and which must use low 
high_par = {"parser1", "parser3", "parser4", "parser6", "parser7"} 
low_par = {"parser2", "parser5"} 

def process_items(key, value): 
    parsers[key](value) 

def run_pool(func, items, num_items, check_set): 
    pool = Pool(num_items) 
    out = pool.map(func, (item for item in items if item[0] in check_set)) 
    pool.close() 
    pool.join() 
    return out 

if __name__ == "__main__": 
    items = [('parser2', x), ...] # Your list of tuples 
    # Process with high parallelism 
    high_results = run_pool(process_items, items, 4, high_par) 
    # Process with low parallelism 
    low_results = run_pool(process_items, items, 2, low_par) 

Попытка сделать это в одном Pool возможно, благодаря умному использованию примитивов синхронизации, но я не думаю, что это будет выглядеть намного чище, чем это. Это также может закончиться менее эффективно, так как иногда вашему пулу придется ждать, пока работа закончится, поэтому он может обрабатывать элемент с низким параллелизмом, даже если в очереди находятся элементы с высоким параллелизмом.

Это усложняться немного, если вам нужно, чтобы получить результаты от каждого process_items вызова в том же порядке, как они упали в первоначальном Iterable, то есть результаты от каждого Pool нужно сольются, но, основываясь на вашем примере I не думайте, что это требование. Дайте мне знать, если это так, и я постараюсь соответствующим образом настроить свой ответ.

+0

Спасибо, прошу прощения. Это, наверное, лучший способ, так как я хочу, чтобы все было просто. –

1

Вы можете указать количество параллельных потоков в конструкторе для multiprocessing.Pool:

from multiprocessing import Pool 

def f(x): 
    return x*x 

if __name__ == '__main__': 
    pool = Pool(5) # 5 is the number of parallel threads 
    print pool.map(f, [1, 2, 3]) 
+0

Извините, я обновил свой вопрос, так как я не объяснил себя правильно. Я уже установил процессы, и только один/два из моих процессов не могут работать на этом параллелизме и не сработают. –

Смежные вопросы