У меня есть этот питон код для чтения файла, сделать некоторые обработки и записью результатов параллельно:питон ошибка очереди многопроцессорной
def line_chunker(path):
"""
Reads a file in chunks and yields each chunk.
Each chunk is guaranteed to end at a carriage return (EOL).
Each chunk is returned a single string.
The number of chunks the file is split into is equal to the number of CPU cores
available
"""
size = os.path.getsize(path)
cores = mp.cpu_count()
chunksize = size/cores # gives truncated integer
f = open(path)
s = f.readline() # skip header
while True:
part = f.readlines(chunksize)
if not part:
f.close()
break
else:
yield "".join(part)
f.close()
def _validate(chunk, outq):
""" Performs format validation on a given chunk of a csv file """
rows = csv.reader(StringIO(chunk))
vld = validation.RowValidator(rows)
vld.check_rows()
outq.put(vld.errors)
def _write(outq):
"""Writes lines in the outq to a text file """
outfile = open("C:/testoutput.txt", "w")
while True:
result = outq.get()
if result is None:
outfile.close()
break
else:
for line in result:
outfile.write(line)
outfile.write("\n")
def validate_pll(path):
""" Perform validation in parallel """
pool = mp.Pool()
outq = mp.Manager().Queue(maxsize = 8)
writer = mp.Process(target = _write, args = (outq,))
writer.start()
for chunk in line_chunker(path):
pool.apply_async(_validate, (chunk, outq))
pool.close()
pool.join()
Он считывает файл в кусках и для каждого фрагмента начинается процесс, чтобы сделать то обработка. Результаты обработки помещаются в очередь, на которую смотрит другой процесс.
Код запускается, но после завершения я получаю нечетный EOFError
.
Я подозреваю, что это потому, что я не называю writer.join()
, но если я добавляю эту линию, например, так:
def validate_pll(path):
""" Perform validation in parallel """
pool = mp.Pool()
outq = mp.Manager().Queue(maxsize = 8)
writer = mp.Process(target = _write, args = (outq,))
writer.start()
for chunk in line_chunker(path):
pool.apply_async(_validate, (chunk, outq))
pool.close()
pool.join()
writer.join()
код просто зависает. Любая идея, что я делаю неправильно?
Сообщение об ошибке получили в жизни:
Process Process-10:
Traceback (most recent call last):
File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap
self.run()
File C\Anaconda\lib\multiprocessing\process.py line 114, in run
self._target(*self._args, **self._kwargs)
File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write
result = outq.get()
File "(string)", line 2, in get
File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod
kind, result = conn.recv()
EOFError
duh! конечно. Я думаю, что я думал, что очередь автоматически выдаст None, если там ничего нет, но, конечно, если вы вызовеете get() в пустой очереди, он просто ждет, чтобы что-то вернуть. Я добавил '' outq.put (None) '' перед '' pool.close() '', и теперь он отлично работает. – jramm