Я думаю, что ваша проблема связана главным образом с тем, что ваша параллельная функция - это метод объекта. Трудно быть уверенным, без дополнительной информации, но рассмотреть эту небольшую программу игрушка:
import multiprocessing as mp
import numpy as np
import gc
class Object(object):
def __init__(self, _):
self.data = np.empty((100, 100, 100), dtype=np.float64)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def new_obj(self, i):
return Object(i)
def __del__(self):
print("Dead")
if __name__ == '__main__':
c = Container()
for j in range(5):
c.foo()
Теперь Container
вызывается только один раз, так что вы ожидали бы увидеть "Born"
, после чего "Dead"
быть распечатаны; но поскольку код, выполняемый процессами, является способом контейнера, это означает, что контейнер должен быть выполнен в другом месте! Запуск этого, вы увидите поток приобщена "Born"
и "Dead"
, как ваш контейнер перестраиваются на каждом исполнения карты:
Born
Born
Born
Born
Born
Dead
Born
Dead
Dead
Born
Dead
Born
...
<MANY MORE LINES HERE>
...
Born
Dead
Чтобы убедить себя, что весь контейнер копируемый и разослал каждый раз, попытаться установить некоторые не-serialisable значения:
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(self.new_obj, range(50))
self.fn = lambda x: x**2
self.objects.extend(result.get())
pool.terminate()
gc.collect()
Который будет немедленно поднимать AttributeError
, поскольку он не может сериализацию контейнера.
Подытожим: при отправке 1000 запросов на бассейн, Container
будет сериализовать, посланный к процессам и deserialised там в 1000 раз. Конечно, они в конечном итоге будут отброшены (если не будет слишком много странных перекрестных ссылок), но это определенно окажет значительное давление на ОЗУ, поскольку объект сериализуется, называется, обновляется, ресериализуется ... для каждого элемент в ваших сопоставленных входах.
Как вы можете это решить? Ну, в идеале, не разделяют состояние:
def new_obj(_):
return Object(_)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj, range(50))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
Это завершает в долю времени, и производит только малейший дирижабль на RAM (как один Container
будет когда-либо построен).Если вам нужно какое-то внутреннее состояние, которое необходимо передать туда, извлеките его и отправьте только:
def new_obj(tup):
very_important_state, parameters = tup
return Object(very_important_state=very_important_state,
parameters=parameters)
class Container(object):
def __new__(cls):
self = object.__new__(cls)
print("Born")
return self
def __init__(self):
self.objects = []
def foo(self):
important_state = len(self.objects)
with mp.Pool(processes=3, maxtasksperchild=10) as pool:
result = pool.map_async(new_obj,
((important_state, i) for i in range(50)))
self.objects.extend(result.get())
pool.terminate()
gc.collect()
def __del__(self):
print("Dead")
Это имеет такое же поведение, как и раньше. Если вы абсолютно не можете избежать совместного использования некоторого изменчивого состояния между процессами, выйдите из него the multiprocessing tools, чтобы сделать это без необходимости копировать все везде каждый раз.
См. Мое редактирование. Итак, если я правильно понял, мне нужно вызвать метод extern вне моего объекта в каждом процессе? – Jonas
Параллелизированная функция 'self.new_obj', являющаяся методом объекта, _requires_ весь родительский узел, который будет сериализован и отправлен по каждому вызову; если вы можете извлечь этот метод, чтобы _function_ 'new_obj (...)' возвращал (простой, осиротевший, «безгосударственный») новый узел и 'foo' отвечал за его связывание (добавив родителя refs <-> и т. д. .. но в процессе _calling_) вся эта проблема, скорее всего, исчезнет: дочерние процессы требуют отправки только состояния minmal. – val