2016-07-01 2 views
6

Я запускаю несколько процессов, чтобы создать список новых объектов. htop показывает меня между 1 и 4 процессами (я всегда создаю 3 новых объекта).Python3: многопроцессор потребляет много оперативной памяти и замедляется

def foo(self): 
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, self.information) 
     self.new_objs = result.get() 
     pool.terminate() 
    gc.collect() 

Я называю foo() несколько раз, каждый раз, когда он вызывается, весь процесс работает медленнее, программа не даже закончить в конце концов, как это замедляет много. Программа начинает потреблять всю мою оперативную память, в то время как последовательный подход не имеет значительного использования ОЗУ.

Когда я убил программу, большую часть времени это была функция, которую программа выполняла в последний раз.

->File "threading.py", line 293, in wait 
    waiter.acquire() 

Редактировать Чтобы дать какую-то информацию о моих обстоятельствах. Я создаю дерево из узлов. foo() вызывается родительским узлом для создания его дочерних узлов. Возвращаемые процессами result являются дочерними узлами. Они сохраняются в списке в родительском узле. Я хочу распараллелить создание этих дочерних узлов, а не создавать их последовательно.

ответ

2

Я думаю, что ваша проблема связана главным образом с тем, что ваша параллельная функция - это метод объекта. Трудно быть уверенным, без дополнительной информации, но рассмотреть эту небольшую программу игрушка:

import multiprocessing as mp 
import numpy as np 
import gc 


class Object(object): 
    def __init__(self, _): 
     self.data = np.empty((100, 100, 100), dtype=np.float64) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(self.new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def new_obj(self, i): 
     return Object(i) 

    def __del__(self): 
     print("Dead") 


if __name__ == '__main__': 
    c = Container() 
    for j in range(5): 
     c.foo() 

Теперь Container вызывается только один раз, так что вы ожидали бы увидеть "Born", после чего "Dead" быть распечатаны; но поскольку код, выполняемый процессами, является способом контейнера, это означает, что контейнер должен быть выполнен в другом месте! Запуск этого, вы увидите поток приобщена "Born" и "Dead", как ваш контейнер перестраиваются на каждом исполнения карты:

Born 
Born 
Born 
Born 
Born 
Dead 
Born 
Dead 
Dead 
Born 
Dead 
Born 
... 
<MANY MORE LINES HERE> 
... 
Born 
Dead 

Чтобы убедить себя, что весь контейнер копируемый и разослал каждый раз, попытаться установить некоторые не-serialisable значения:

def foo(self): 
    with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, range(50)) 
     self.fn = lambda x: x**2 
     self.objects.extend(result.get()) 
     pool.terminate() 
    gc.collect() 

Который будет немедленно поднимать AttributeError, поскольку он не может сериализацию контейнера.

Подытожим: при отправке 1000 запросов на бассейн, Container будет сериализовать, посланный к процессам и deserialised там в 1000 раз. Конечно, они в конечном итоге будут отброшены (если не будет слишком много странных перекрестных ссылок), но это определенно окажет значительное давление на ОЗУ, поскольку объект сериализуется, называется, обновляется, ресериализуется ... для каждого элемент в ваших сопоставленных входах.

Как вы можете это решить? Ну, в идеале, не разделяют состояние:

def new_obj(_): 
    return Object(_) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

Это завершает в долю времени, и производит только малейший дирижабль на RAM (как один Container будет когда-либо построен).Если вам нужно какое-то внутреннее состояние, которое необходимо передать туда, извлеките его и отправьте только:

def new_obj(tup): 
    very_important_state, parameters = tup 
    return Object(very_important_state=very_important_state, 
        parameters=parameters) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     important_state = len(self.objects) 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, 
            ((important_state, i) for i in range(50))) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

Это имеет такое же поведение, как и раньше. Если вы абсолютно не можете избежать совместного использования некоторого изменчивого состояния между процессами, выйдите из него the multiprocessing tools, чтобы сделать это без необходимости копировать все везде каждый раз.

+0

См. Мое редактирование. Итак, если я правильно понял, мне нужно вызвать метод extern вне моего объекта в каждом процессе? – Jonas

+0

Параллелизированная функция 'self.new_obj', являющаяся методом объекта, _requires_ весь родительский узел, который будет сериализован и отправлен по каждому вызову; если вы можете извлечь этот метод, чтобы _function_ 'new_obj (...)' возвращал (простой, осиротевший, «безгосударственный») новый узел и 'foo' отвечал за его связывание (добавив родителя refs <-> и т. д. .. но в процессе _calling_) вся эта проблема, скорее всего, исчезнет: дочерние процессы требуют отправки только состояния minmal. – val