2015-04-02 4 views
1

Я использую библиотеку Python multiprocessing для обработки списка входов со встроенным методом map(). Вот соответствующий фрагмент кода:Печать строки обновления всякий раз, когда подпроцесс заканчивается в многопроцессорном пуле Python 3

subp_pool = Pool(self.subprocesses) 
cases = subp_pool.map(self.get_case, input_list) 
return cases 

Функция, которая будет работать параллельно является self.get_case(), а список входов input_list.

Я хотел бы напечатать прогресс строки на стандартный вывод в следующем формате:

Working (25/100 cases processed) 

Как я могу обновить локальную переменную внутри класса, который содержит пул, так что всякий раз, когда подпроцесс, то переменная увеличивается на 1 (а затем печатается на стандартный вывод)?

ответ

1

Невозможно сделать это, используя multiprocessing.map, поскольку он не предупреждает о главном процессе ни о чем, пока не выполнит все его задачи. Тем не менее, вы можете получить подобное поведение с помощью apply_async в тандеме с аргументом callback ключевого слова:

from multiprocessing.dummy import Pool 
from functools import partial 
import time 

class Test(object): 
    def __init__(self): 
     self.count = 0 
     self.threads = 4 

    def get_case(self, x): 
     time.sleep(x) 

    def callback(self, total, x): 
     self.count += 1 
     print("Working ({}/{}) cases processed.".format(self.count, total)) 

    def do_async(self): 
     thread_pool = Pool(self.threads) 
     input_list = range(5) 
     callback = partial(self.callback, len(input_list)) 
     tasks = [thread_pool.apply_async(self.get_case, (x,), 
             callback=callback) for x in input_list] 
     return [task.get() for task in tasks] 

if __name__ == "__main__": 
    t = Test() 
    t.do_async() 
+0

Почему вы используете 'multiprocessing.dummy' вместо' multiprocessing'? – mittelmania

+1

@mittelmania Ваш код ссылался на темы, поэтому я предположил, что вы их тоже использовали. Не стесняйтесь удалять '.dummy' из моего примера, он должен работать точно так же (предполагая, что вы используете Python 3.x, как указано в вопросе). – dano

0

Вызовите print_data() из метода get_case(), и вы сделали.

from threading import Lock 

Class A(object): 

def __init__(self): 
    self.mutex = Lock() 
    self.count = 0 

def print_data(self): 
    self.mutex.acquire() 
    try: 
     self.count += 1 
     print('Working (' + str(self.count) + 'cases processed)') 
    finally: 
     self.mutex.release() 
+0

То, что это не будет работать, если вы используете пул процессов вместо пула потоков. – dano

Смежные вопросы