2015-10-14 2 views
-1

Я использую библиотеку многопроцессорности python для обработки информации в рамках набора процессов. Эти процессы также содержат процессы, которые далее делят объем работы, которая должна быть выполнена. Существует один Manager.Queue, который накапливает результаты всех процессов, которые потребляют данные.Процесс Python завершается до завершения ввода-вывода

В основном потоке скрипта python. Я попытался использовать соединение, чтобы заблокировать основной поток, пока мы не сможем разумно определить, были ли все подпроцессы завершены, а затем записать вывод в один файл. Однако система завершается, и файл закрывается до того, как все данные будут записаны в файл.

Следующий код представляет собой упрощенное извлечение реализации описанного выше решения. для очереди в inQueues: queue.join()

for p in processes: 
    p.join() 
print "At the end output has: " + str(out_queue.qsize()) + " records" 

with open("results.csv", "w") as out_file: 
    out_file.write("Algorithm,result\n") 
    while not out_queue.empty(): 
     res = out_queue.get() 
     out_file.write(res['algorithm'] + ","+res['result']+"\n") 
     out_queue.task_done() 
     time.sleep(0.05) 
    out_queue.join() 
    out_file.close() 

out_queue.qsize() напечатает избыток 500 записей доступны, однако, будут напечатаны только 100 в файл. Также на данный момент я не уверен на 100%, если 500 записей - это сумма, генерируемая системой, а только количество, указанное на этом этапе.

Как обеспечить, чтобы все результаты были записаны в файл results.csv?

+0

[QSize()] (http://bugs.python.org/issue17985): «Верните примерный размер очереди Из-за. многопоточность/многопроцессорная семантика, этот номер не является надежным. " – kay

+0

Я знаю, что размер очереди, обозначенный методом qsize, может измениться, однако раздел кода является единственной частью всей программы, которая удаляется из очереди, поэтому не ожидается, что количество напечатанных записей будет быть меньше размера очереди (что и происходит в настоящее время). – kyleED

ответ

0

Не ждите, все процессы, чтобы закончить, прежде чем потреблять данные, но обрабатывать данные в то же время и запоминать, какие процессы все еще работают:

processes = [] 

"""start processes and append them to processes""" 

while True: 
    try: 
     # get an item 
     item = queue.get(True, 0.5) 
    except Queue.Empty: 
     # no item received in half a second 
     if not processes: 
      # there are no more processes and nothing left to process 
      break 
     else: 
      proc_num = 0 
      while proc_num < len(processes): 
       process = processes[proc_num] 
       exit_code = process.poll() 
       if exit_code is None: 
        # process is still running, proceed to next 
        proc_num += 1 
       elif exit_code == 0: 
        # process ended gracefully, remove it from list 
        processes.pop(proc_num) 
       else: 
        # process ended with an error, what now? 
        raise Exception('Her last words were: "%r"' % exit_code) 
    else: 
     # got an item 
     """process item""" 

Не проверять, если processes пуст вне Queue.Empty или у вас будет races.

Но, вероятно, вы будете счастливее с higher level function:

pool = multiprocessing.Pool(8) 
items = pool.map_async(producer_function, producer_arguments) 
for item in items: 
    """process item""" 
Смежные вопросы