2013-03-29 2 views
6

Это похоже на простую проблему, но я не могу обойти ее.Запись данных в файл hdf с использованием многопроцессорной обработки

У меня есть симуляция, которая работает в цикле double for и записывает результаты в файл HDF. Простая версия этой программы показана ниже:

import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    for ii in a: 
     print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 
Simulation() 

Этот код делает именно то, что я хочу, но так как этот процесс может занять некоторое время, чтобы запустить я пытался использовать модуль многопроцессорной и использовать следующий код:

import multiprocessing 
import tables as pt 

a = range(10) 
b = range(5) 

def Simulation(ii): 
    hdf = pt.openFile('simulation.h5',mode='w') 
    print(ii) 
     hdf.createGroup('/','A%s'%ii) 
     for i in b: 
      hdf.createArray('/A%s'%ii,'B%s'%i,[ii,i]) 
     hdf.close() 
    return 

if __name__ == '__main__': 
    jobs = [] 
    for ii in a: 
     p = multiprocessing.Process(target=Simulation, args=(ii,)) 
     jobs.append(p)  
     p.start() 

Это, однако, только печатает последнюю симуляцию в файле HDF, так или иначе она переопределяет все остальные группы.

ответ

10

Каждый раз, когда вы открываете файл в режиме записи (w), создается новый файл - поэтому содержимое файла теряется, если оно уже существует. Только последний дескриптор файла может успешно записать в файл. Даже если вы изменили это в режиме добавления, вы не должны пытаться писать в один файл из нескольких процессов - выход будет искажен, если два процесса попытаются записать в одно и то же время.

Вместо этого, есть все рабочие процессы положить выход в очереди, и имеют единый выделенный процесс (либо подпроцесс или основной процесс) обработки выходного сигнала из очереди и запись в файл:


import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 
sentinel = None 


def Simulation(inqueue, output): 
    for ii in iter(inqueue.get, sentinel): 
     output.put(('createGroup', ('/', 'A%s' % ii))) 
     for i in range(num_arrays): 
      output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 


def handle_output(output): 
    hdf = pt.openFile('simulation.h5', mode='w') 
    while True: 
     args = output.get() 
     if args: 
      method, args = args 
      getattr(hdf, method)(*args) 
     else: 
      break 
    hdf.close() 

if __name__ == '__main__': 
    output = mp.Queue() 
    inqueue = mp.Queue() 
    jobs = [] 
    proc = mp.Process(target=handle_output, args=(output,)) 
    proc.start() 
    for i in range(num_processes): 
     p = mp.Process(target=Simulation, args=(inqueue, output)) 
     jobs.append(p) 
     p.start() 
    for i in range(num_simulations): 
     inqueue.put(i) 
    for i in range(num_processes): 
     # Send the sentinal to tell Simulation to end 
     inqueue.put(sentinel) 
    for p in jobs: 
     p.join() 
    output.put(None) 
    proc.join() 

Для сравнения, вот версия, которая использует mp.Pool:

import multiprocessing as mp 
import tables as pt 


num_arrays = 100 
num_processes = mp.cpu_count() 
num_simulations = 1000 


def Simulation(ii): 
    result = [] 
    result.append(('createGroup', ('/', 'A%s' % ii))) 
    for i in range(num_arrays): 
     result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i]))) 
    return result 


def handle_output(result): 
    hdf = pt.openFile('simulation.h5', mode='a') 
    for args in result: 
     method, args = args 
     getattr(hdf, method)(*args) 
    hdf.close() 


if __name__ == '__main__': 
    # clear the file 
    hdf = pt.openFile('simulation.h5', mode='w') 
    hdf.close() 
    pool = mp.Pool(num_processes) 
    for i in range(num_simulations): 
     pool.apply_async(Simulation, (i,), callback=handle_output) 
    pool.close() 
    pool.join() 

Это выглядит проще, не так ли? Однако есть одно значащее различие. Исходный код использовал output.put для отправки args в handle_output, который выполнялся в собственном подпроцессе. handle_output взял args из очереди output и обработал их немедленно. С приведенным выше кодом пула Simulation накапливает целую кучу args в result и result не отправляется на handle_output до после Simulation возвращается.

Если Simulation занимает много времени, будет длительный период ожидания, пока ничего не записывается в simulation.h5.

+0

В дополнение к этому вопросу я использовал код выше с успехом, но теперь теперь расширяю эту симуляцию, цикл for, заданный a = range (1000), а также цикл for, заданный b = range (100). Этот howerver приводит к широкому использованию моей памяти. У меня 8 процессоров с 16 Гб оперативной памяти, но когда я запускаю файл (даже без реальных симуляций), мое использование ОЗУ составляет 100%, что приводит к остановке моей системы. – user2143958

+0

Я думаю, нам нужно отделить количество подпроцессов от числа задач. Похоже, вы хотите 1000 задач, но, вероятно, не 1000 подпроцессов. Я отредактирую сообщение, чтобы предложить способ, которым вы могли бы это сделать. – unutbu

+0

Да, вы правы, в предыдущем примере для больших итераций было создано одинаковое количество подпроцессов, забивающих всю память. Файл, который вы редактировали, отлично работает! Но только для разъяснения я также экспериментировал с функцией Pool(), и эта функция, похоже, работает неплохо, хотя становится все труднее, когда необходимо передать более одной переменной. Какова основная причина выбора функции Process() над функцией Pool()? – user2143958

Смежные вопросы

 Смежные вопросы