2014-12-17 2 views
3
import multiprocessing as mp 

if __name__ == '__main__': 

    #pool = mp.Pool(M) 

    p1 = mp.Process(target= target1, args= (arg1,)) 
    p2 = mp.Process(target= target2, args= (arg1,)) 
    ... 
    p9 = mp.Process(target= target9, args= (arg9,)) 
    p10 = mp.Process(target= target10, args= (arg10,)) 
    ... 
    pN = mp.Process(target= targetN, args= (argN,)) 

    processList = [p1, p2, .... , p9, p10, ... ,pN] 

У меня есть N различных целевых функций, которые потребляют неравное нетривиальное количество времени для выполнения.Выполнить список процессов без многопроцессорной пула map

Я ищу способ выполнить их параллельно, чтобы M (1 < M < N) выполнялись одновременно. И как только процесс завершится, следующий процесс должен начинаться с списка, пока все процессы в processList не будут завершены.

Поскольку я не называю одну и ту же целевую функцию, я не мог использовать Pool.

Я считал делать что-то вроде этого:

for i in range(0, N, M): 
     limit = i + M 
     if(limit > N): 
      limit = N 
     for p in processList[i:limit]: 
      p.join() 

Поскольку мои целевые функции потребляют неодинаковое время выполнения, этот метод не очень эффективен.

Любые предложения? Заранее спасибо.

EDIT: Заголовок вопроса был изменен на «Выполнение списка процессов без многопроцессорной карты пулов» из «Выполнить список процессов без многопроцессорного пула».

+1

Вы можете изменить свой код, чтобы иметь только один цель. –

+0

Да, перестановка может решить проблему. Я не хочу этого делать, потому что мои цели принадлежат к разным модулям, и это часть «большого» проекта.Спасибо @BenjaminToueg! – Shravan

+0

Вы можете сделать это, используя пул. Наличие разных функций только мешает вам использовать 'Pool.map'. См. Ответ Джилл Бейтс. – Dunes

ответ

3

Вот способ сделать это в Python 3.4, который может быть адаптирован для Python 2.7:

targets_with_args = [ 
    (target1, arg1), 
    (target2, arg2), 
    (target3, arg3), 
    ... 
] 

with concurrent.futures.ProcessPoolExecutor(max_workers=20) as executor: 
    futures = [executor.submit(target, arg) for target, arg in targets_with_args] 
    results = [future.result() for future in concurrent.futures.as_completed(futures)] 
4

Вы можете использовать proccess Pool:

#!/usr/bin/env python 
# coding=utf-8 

from multiprocessing import Pool 
import random 
import time 


def target_1(): 
    time.sleep(random.uniform(0.5, 2)) 
    print('done target 1') 

def target_2(): 
    time.sleep(random.uniform(0.5, 2)) 
    print('done target 1') 

def target_3(): 
    time.sleep(random.uniform(0.5, 2)) 
    print('done target 1') 

def target_4(): 
    time.sleep(random.uniform(0.5, 2)) 
    print('done target 1') 


pool = Pool(2) # maximum two processes at time. 
pool.apply_async(target_1) 
pool.apply_async(target_2) 
pool.apply_async(target_3) 
pool.apply_async(target_4) 
pool.close() 
pool.join() 

бассейн создан специально для того, что вам нужно сделать - выполнить много задач в ограниченном количестве процессов.

Я также предлагаю вам посмотреть библиотеку concurrent.futures и это backport to Python 2.7. Он имеет ProcessPoolExecutor, который имеет примерно одинаковые возможности, но его методы возвращают объекты Future, и они имеют более удобный API.

+0

Это, кажется, лучшее решение, чем использование одной функции. – phobic

+1

См. Мой ответ для 'ProcessPoolExecutor' –

0

Простым решением будет обернуть функции target {1,2, ... N} в одну функцию forward_to_target, которая перейдет в соответствующую целевую функцию {1,2, ... N} в соответствии с аргументом который передается. Если вы не можете вывести соответствующую целевую функцию из используемых вами аргументов, замените каждый аргумент кортежем (argX, X), а затем в функции forward_to_target распакуйте кортеж и перейдите к соответствующей функции, обозначенной символом X .

0

Вы может иметь два list х целей и аргументов, zip два вместе - и отправить их в функции бегуна (здесь это run_target_on_args):

#!/usr/bin/env python 

import multiprocessing as mp 

# target functions 
targets = [len, str, len, zip] 

# arguments for each function 
args = [["arg1"], ["arg2"], ["arg3"], [["arg5"], ["arg6"]]] 

# applies target function on it's arguments 
def run_target_on_args(target_args): 
    return target_args[0](*target_args[1]) 

pool = mp.Pool() 
print pool.map(run_target_on_args, zip(targets, args)) 
1

Я хотел бы использовать Queue. добавляя к нему процессы от processList, и как только процесс завершится, я удалю его из очереди и добавлю еще один.

код псевдо будет выглядеть следующим образом:

from Queue import Queue 
q = Queue(m) 

# add first process to queue 
i = 0 
q.put(processList[i]) 
processList[i].start() 
i+=1 

while not q.empty(): 
    p=q.get() 

    # check if process is finish. if not return it to the queue for later checking 
    if p.is_alive(): 
     p.put(t) 

    # add another process if there is space and there are more processes to add 
    if not q.full() and i < len(processList): 
     q.put(processList[i]) 
     processList[i].start() 
     i+=1