Каждый раз, когда вы открываете файл в режиме записи (w
), создается новый файл - поэтому содержимое файла теряется, если оно уже существует. Только последний дескриптор файла может успешно записать в файл. Даже если вы изменили это в режиме добавления, вы не должны пытаться писать в один файл из нескольких процессов - выход будет искажен, если два процесса попытаются записать в одно и то же время.
Вместо этого, есть все рабочие процессы положить выход в очереди, и имеют единый выделенный процесс (либо подпроцесс или основной процесс) обработки выходного сигнала из очереди и запись в файл:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
sentinel = None
def Simulation(inqueue, output):
for ii in iter(inqueue.get, sentinel):
output.put(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
output.put(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
def handle_output(output):
hdf = pt.openFile('simulation.h5', mode='w')
while True:
args = output.get()
if args:
method, args = args
getattr(hdf, method)(*args)
else:
break
hdf.close()
if __name__ == '__main__':
output = mp.Queue()
inqueue = mp.Queue()
jobs = []
proc = mp.Process(target=handle_output, args=(output,))
proc.start()
for i in range(num_processes):
p = mp.Process(target=Simulation, args=(inqueue, output))
jobs.append(p)
p.start()
for i in range(num_simulations):
inqueue.put(i)
for i in range(num_processes):
# Send the sentinal to tell Simulation to end
inqueue.put(sentinel)
for p in jobs:
p.join()
output.put(None)
proc.join()
Для сравнения, вот версия, которая использует mp.Pool
:
import multiprocessing as mp
import tables as pt
num_arrays = 100
num_processes = mp.cpu_count()
num_simulations = 1000
def Simulation(ii):
result = []
result.append(('createGroup', ('/', 'A%s' % ii)))
for i in range(num_arrays):
result.append(('createArray', ('/A%s' % ii, 'B%s' % i, [ii, i])))
return result
def handle_output(result):
hdf = pt.openFile('simulation.h5', mode='a')
for args in result:
method, args = args
getattr(hdf, method)(*args)
hdf.close()
if __name__ == '__main__':
# clear the file
hdf = pt.openFile('simulation.h5', mode='w')
hdf.close()
pool = mp.Pool(num_processes)
for i in range(num_simulations):
pool.apply_async(Simulation, (i,), callback=handle_output)
pool.close()
pool.join()
Это выглядит проще, не так ли? Однако есть одно значащее различие. Исходный код использовал output.put
для отправки args в handle_output
, который выполнялся в собственном подпроцессе. handle_output
взял args
из очереди output
и обработал их немедленно. С приведенным выше кодом пула Simulation
накапливает целую кучу args
в result
и result
не отправляется на handle_output
до после Simulation
возвращается.
Если Simulation
занимает много времени, будет длительный период ожидания, пока ничего не записывается в simulation.h5
.
В дополнение к этому вопросу я использовал код выше с успехом, но теперь теперь расширяю эту симуляцию, цикл for, заданный a = range (1000), а также цикл for, заданный b = range (100). Этот howerver приводит к широкому использованию моей памяти. У меня 8 процессоров с 16 Гб оперативной памяти, но когда я запускаю файл (даже без реальных симуляций), мое использование ОЗУ составляет 100%, что приводит к остановке моей системы. – user2143958
Я думаю, нам нужно отделить количество подпроцессов от числа задач. Похоже, вы хотите 1000 задач, но, вероятно, не 1000 подпроцессов. Я отредактирую сообщение, чтобы предложить способ, которым вы могли бы это сделать. – unutbu
Да, вы правы, в предыдущем примере для больших итераций было создано одинаковое количество подпроцессов, забивающих всю память. Файл, который вы редактировали, отлично работает! Но только для разъяснения я также экспериментировал с функцией Pool(), и эта функция, похоже, работает неплохо, хотя становится все труднее, когда необходимо передать более одной переменной. Какова основная причина выбора функции Process() над функцией Pool()? – user2143958