2013-11-27 4 views
0

У меня проблема с расплывающимися параллелизмами, состоящая из множества задач, которые решаются независимо друг от друга. Решение каждой из задач довольно продолжительное, поэтому это основной кандидат на многопроцессорную обработку.Создание и повторное использование объектов в процессах python

Проблема в том, что решение моих задач требует создания определенного объекта, который сам по себе занимает много времени, но может использоваться повторно для всех задач (подумайте о внешней двоичной программе, которая должна быть запущена), поэтому в серийном версия я сделать что-то вроде этого:

def costly_function(task, my_object): 
    solution = solve_task_using_my_object 
    return solution 

def solve_problem(): 
    my_object = create_costly_object() 
    tasks = get_list_of_tasks() 
    all_solutions = [costly_function(task, my_object) for task in tasks] 
    return all_solutions 

Когда я пытаюсь распараллелить программу с использованием многопроцессорной обработки, my_object не может быть передан в качестве параметра для целого ряда причин (он не может быть маринованные, и он не должен работать больше, чем один задача в то же время), поэтому мне приходится прибегать к созданию отдельного экземпляра объекта для каждой задачи:

def costly_function(task): 
    my_object = create_costly_object() 
    solution = solve_task_using_my_object 
    return solution 

def psolve_problem(): 
    pool = multiprocessing.Pool() 
    tasks = get_list_of_tasks() 
    all_solutions = pool.map_async(costly_function, tasks) 
    return all_solutions.get() 

, но дополнительные затраты на создание нескольких экземпляров my_object делают этот код лишь незначительно быстрее, чем сериализованный.

Если бы я мог создать отдельный экземпляр my_object в каждом процессе, а затем повторно использовать их для всех задач, которые будут выполняться в этом процессе, мои тайминги значительно улучшатся. Любые указатели на то, как это сделать?

ответ

3

Я нашел простой способ решить мою собственную проблему без привлечения каких-либо инструментов помимо стандартной библиотеки, я думал, что буду писать ее здесь, если у кого-то еще есть аналогичная проблема.

multiprocessing.Pool принимает функцию initializer (с аргументами), которая запускается при запуске каждого процесса. Возвращаемое значение этой функции нигде не сохраняется, но можно воспользоваться функцией, чтобы создать глобальную переменную:

def init_process(): 
    global my_object 
    my_object = create_costly_object() 

def costly_function(task): 
    global my_object 
    solution = solve_task_using_my_object 
    return solution 

def psolve_problem(): 
    pool = multiprocessing.Pool(initializer=init_process) 
    tasks = get_list_of_tasks() 
    all_solutions = pool.map_async(costly_function, tasks) 
    return all_solutions.get() 

Поскольку каждый процесс имеет отдельный глобальное пространство имен, то воплощенные объекты не конфликтует, и они создаются только один раз для каждого процесса.

Возможно, это не самое элегантное решение, но оно достаточно простое и дает мне почти линейное ускорение.

1

вы можете иметь celery project обрабатывать все это для вас, среди многих других особенностей также есть способ запустить какую-то задачу initialization, которая может быть использована последняя всеми задачами

+0

Спасибо, сельдерей выглядит великолепно, но слишком много излишнего для моих целей, отдельный брокер для обмена сообщениями и весь шаблон приложения довольно немного для меня ... – Javier

1

Вы правы, что вы ограничены в pickable объекты при использовании multiprocessing. Вы абсолютно уверены в том, что ваш объект неадекватен?

Вы пробовали dill? Если вы импортируете его, в любое время, когда соленое название называется, оно будет использовать привязки укропа. Это сработало для меня, когда я пытался использовать многопроцессорность по уравнениям sympy.

+0

Даже если мне удалось разложить мой объект (что было бы жесткий, поскольку он включает системные двоичные файлы, запущенные за пределами интерпретатора python), он должен поддерживать постоянное состояние для выполнения задачи; либо это состояние будет перезаписано, если передать один и тот же объект на две разные задачи, выполняемые одновременно, или мне нужно использовать блокировку объекта (которая побеждает цель многопроцессорности). – Javier

Смежные вопросы