0

Я использую структуру master-slaves для реализации параллельного вычисления. Один мастер-процесс (0) загружает данные и распределяет соответствующие куски и инструкции для подчиненных процессов (1 - N), которые выполняют тяжелый подъем, используя большие объекты ... бла-бла-бла. Проблема заключается в использовании памяти, которую я отслеживаю с использованием resource.getrusage(resource.RUSAGE_SELF).ru_maxrss для каждого подчиненного процесса.Освобождение памяти в циклах параллельных процессов python

В первой задаче используется около 6 ГБ памяти, как и ожидалось, но когда подчиненное устройство получает вторую задачу, оно поднимается до чуть более 10 ГБ --- как если бы предыдущая память не собиралась. Я понял, что как только переменная теряет свои ссылки (в приведенном ниже коде, когда сбрасывается переменная _gwb) сбор мусора должен очищать дом. Почему это не происходит?

Будет ли выбрасывание в del _gwb в конце каждого цикла помощи?
Как насчет ручного вызова gc.collect()?
Или мне нужно нерест subprocess es as described in this answer?

Я использую mpi4py на управляемом кластером SLURM.

мастер процесс выглядит примерно так:

for jj, tt in enumerate(times): 

    for ii, sim in enumerate(sims): 

     search = True 
     # Find a slave to give this task to 
     while search: 
      # Repackage HDF5 data into dictionary to work with MPI 
      sim_dat = ... # load some data 

      # Look for available slave process 
      data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG) 
      src = stat.Get_source() 

      # Store Results 
      if tag == TAGS.DONE: 
       _store_slave_results(data, ...) 
       num_done += 1 
      elif tag == TAGS.READY: 
       # Distribute tasks 
       comm.send(sim_data, dest=src, tag=TAGS.START) 
       # Stop searching, move to next task 
       search = False 

      cycles += 1 

И в рабов:

while True: 
    # Tell Master this process is ready 
    comm.send(None, dest=0, tag=TAGS.READY) 
    # Receive ``task`` ([number, gravPot, ndensStars]) 
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) 
    tag = stat.Get_tag() 

    if tag == TAGS.START: 
     _gwb = Large_Data_Structure(task) 
     data = _gwb.do_heavy_lifting(task) 
     comm.send(data, dest=0, tag=TAGS.DONE) 
    elif tag == TAGS.EXIT: 
     break 

    cycles += 1 

Edit: Некоторые другие странные тонкости (в случае, если они могут иметь отношение) :
1) только некоторые процессы показывают, что память растет, othe r остаются примерно одинаковыми;
2) Удельный объем памяти активного вещество отличается от различных ведомые процессов (отличающиеся 100s of MB ... хотя они обязательно должны быть запущены и тем же код!

ответ

1

del _gwb должен сделать большую разницу. С _gwb = Large_Data_Structure(task) новые данные генерируются, а затем присваиваются _gwd.Только тогда публикуются старые данные. Специфический del рано избавится от объекта. Вы можете увидеть увеличение памяти для второго цикла - python освобождает объект в свою кучу, но нет ничего, что могло бы сказать, что следующее распределение получит точно такой же набор памяти.

Сборщик мусора входит в игру только в случаях, когда регулярного подсчета ссылок недостаточно для запуска освобождения памяти. Предполагая, что do_heavy_lifting не делает ничего напуганного, это не изменит ситуацию.

Вы упомянули subprocess ... еще один вариант на системах Linux - os.fork. Детский процесс получает представление «копирование на запись» родительского адресного пространства. Большой объект генерируется в дочерней памяти и исчезает при выходе. Я не могу гарантировать, что это сработает, но будет интересным экспериментом.

while True: 
    # Tell Master this process is ready 
    comm.send(None, dest=0, tag=TAGS.READY) 
    # Receive ``task`` ([number, gravPot, ndensStars]) 
    task = comm.recv(source=0, tag=MPI.ANY_TAG, status=stat) 
    tag = stat.Get_tag() 

    if tag == TAGS.START: 
     pid = os.fork() 
     if pid: 
      # parent waits for child 
      os.waitpid(pid) 
     else: 
      # child does work, sends results and exits 
      _gwb = Large_Data_Structure(task) 
      data = _gwb.do_heavy_lifting(task) 
      comm.send(data, dest=0, tag=TAGS.DONE) 
      os._exit() 
    elif tag == TAGS.EXIT: 
     break 

    cycles += 1 
+0

Спасибо! Я попробую. Несколько странных тонкостей (в случае, если они могут быть релевантными): 1) только некоторые процессы показывают рост памяти, другие остаются примерно одинаковыми; 2) Конкретный объем активной памяти * * различен * на разных подчиненных процессах ... хотя они обязательно должны работать с одним и тем же кодом! – DilithiumMatrix

+0

Не могу сказать, почему это так. Вычисленные данные могут быть чувствительны к входящим параметрам (например, 'range (count)' различается в зависимости от того, является ли счет 1 или 10000000). Просто догадка. – tdelaney

+0

Хмм, я не могу найти места, где размеры данных должны изменяться между процессами. Я попробовал его с помощью 'del _gwb', и никаких изменений в поведении памяти не произошло ... Объект' _gwb' хранит некоторые ссылки на внешние объекты, например. '_gwb = Large_Data_Structure (task, other_obj)' и в конструкторе '_gwb':' (self.other = other_obj) '... может ли это сохранить память от сбора? – DilithiumMatrix

Смежные вопросы