2014-09-23 3 views
0

У меня есть этот питон код для чтения файла, сделать некоторые обработки и записью результатов параллельно:питон ошибка очереди многопроцессорной

def line_chunker(path): 
    """ 
    Reads a file in chunks and yields each chunk. 
    Each chunk is guaranteed to end at a carriage return (EOL). 
    Each chunk is returned a single string. 

    The number of chunks the file is split into is equal to the number of CPU cores 
    available 
    """ 
    size = os.path.getsize(path) 
    cores = mp.cpu_count() 
    chunksize = size/cores # gives truncated integer 

    f = open(path) 
    s = f.readline() # skip header 
    while True: 
     part = f.readlines(chunksize) 
     if not part: 
      f.close() 
      break 
     else: 
      yield "".join(part) 
    f.close() 

def _validate(chunk, outq): 
    """ Performs format validation on a given chunk of a csv file """ 
    rows = csv.reader(StringIO(chunk)) 
    vld = validation.RowValidator(rows) 
    vld.check_rows() 
    outq.put(vld.errors) 

def _write(outq): 
    """Writes lines in the outq to a text file """ 
    outfile = open("C:/testoutput.txt", "w") 
    while True: 
     result = outq.get() 
     if result is None: 
      outfile.close() 
      break 
     else: 
      for line in result: 
       outfile.write(line) 
       outfile.write("\n") 

def validate_pll(path):  
    """ Perform validation in parallel """ 

    pool = mp.Pool() 
    outq = mp.Manager().Queue(maxsize = 8) 

    writer = mp.Process(target = _write, args = (outq,)) 
    writer.start() 
    for chunk in line_chunker(path): 
     pool.apply_async(_validate, (chunk, outq)) 

    pool.close() 
    pool.join() 

Он считывает файл в кусках и для каждого фрагмента начинается процесс, чтобы сделать то обработка. Результаты обработки помещаются в очередь, на которую смотрит другой процесс.

Код запускается, но после завершения я получаю нечетный EOFError.

Я подозреваю, что это потому, что я не называю writer.join(), но если я добавляю эту линию, например, так:

def validate_pll(path):  
    """ Perform validation in parallel """ 

    pool = mp.Pool() 
    outq = mp.Manager().Queue(maxsize = 8) 

    writer = mp.Process(target = _write, args = (outq,)) 
    writer.start() 
    for chunk in line_chunker(path): 
     pool.apply_async(_validate, (chunk, outq)) 

    pool.close() 
    pool.join() 
    writer.join() 

код просто зависает. Любая идея, что я делаю неправильно?

Сообщение об ошибке получили в жизни:

Process Process-10: 
Traceback (most recent call last): 
    File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap 
     self.run() 
    File C\Anaconda\lib\multiprocessing\process.py line 114, in run 
     self._target(*self._args, **self._kwargs) 
    File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write 
     result = outq.get() 
    File "(string)", line 2, in get 
    File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod 
     kind, result = conn.recv() 
EOFError 

ответ

1

Процесс _writer все еще ждет записей, которые будут записаны в outq, когда заканчивается основной процесс. Он ждет записей, открывая блокирующее соединение с процессом Manager, который управляет общим Queue. Теперь, в тот момент, когда основной процесс завершает свое выполнение, процесс Manager завершает работу, который отправляет EOF к соединению, которое _writer открыл, и вы видите это исключение.

Чтобы исправить это, вам необходимо сообщить _writer, чтобы завершить работу до окончания основного процесса (и по завершении процесса закрытия Manager). У вас на самом деле уже есть механизм для этого, вы просто не используете его; отправьте None на номер outq, а _writer сделает упорядоченное выключение. Позвоните, чтобы до writer.join(), и все должно работать нормально.

+0

duh! конечно. Я думаю, что я думал, что очередь автоматически выдаст None, если там ничего нет, но, конечно, если вы вызовеете get() в пустой очереди, он просто ждет, чтобы что-то вернуть. Я добавил '' outq.put (None) '' перед '' pool.close() '', и теперь он отлично работает. – jramm

Смежные вопросы