7

Я хочу запустить параллельное вычисление на некоторые входные данные, которые загружаются из файла. (Файл может быть действительно большим, поэтому я использую генератор для этого.)многопроцессорность - чтение больших входных данных - зависание программы

На определенное количество элементов мой код работает нормально, но над этим порогом зависает программа (некоторые из рабочих процессов не заканчиваются).

Любые предложения? (Я запускаю это с помощью python2.7, 8 процессоров, 5000 строк все еще в порядке, 7 500 не работает.)

Во-первых, вам нужен входной файл. Сформировать его в Баш:

for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done 

Затем запустите это:

python2.7 main.py 100 counter.txt > run_log.txt 

main.py: не

#!/usr/bin/python2.7 
import os, sys, signal, time 
import Queue 
import multiprocessing as mp 

def eat_queue(job_queue, result_queue): 
    """Eats input queue, feeds output queue 
    """ 
    proc_name = mp.current_process().name 
    while True: 
     try: 
      job = job_queue.get(block=False) 
      if job == None: 
       print(proc_name + " DONE") 
       return 
      result_queue.put(execute(job)) 
     except Queue.Empty: 
      pass  

def execute(x): 
    """Does the computation on the input data 
    """ 
    return x*x 

def save_result(result): 
    """Saves results in a list 
    """ 
    result_list.append(result) 

def load(ifilename): 
    """Generator reading the input file and 
     yielding it row by row 
    """ 
    ifile = open(ifilename, "r") 
    for line in ifile: 
     line = line.strip() 
     num = int(line) 
     yield (num) 
    ifile.close() 
    print("file closed".upper()) 

def put_tasks(job_queue, ifilename): 
    """Feeds the job queue 
    """ 
    for item in load(ifilename): 
     job_queue.put(item) 
    for _ in range(get_max_workers()): 
     job_queue.put(None) 

def get_max_workers(): 
    """Returns optimal number of processes to run 
    """ 
    max_workers = mp.cpu_count() - 2 
    if max_workers < 1: 
     return 1 
    return max_workers 

def run(workers_num, ifilename): 
    job_queue = mp.Queue() 
    result_queue = mp.Queue() 

    # decide how many processes are to be created 
    max_workers = get_max_workers() 
    print "processes available: %d" % max_workers 
    if workers_num < 1 or workers_num > max_workers: 
     workers_num = max_workers 

    workers_list = [] 
    # a process for feeding job queue with the input file 
    task_gen = mp.Process(target=put_tasks, name="task_gen", 
          args=(job_queue, ifilename)) 
    workers_list.append(task_gen) 

    for i in range(workers_num): 
     tmp = mp.Process(target=eat_queue, name="w%d" % (i+1), 
             args=(job_queue, result_queue)) 
     workers_list.append(tmp) 

    for worker in workers_list: 
     worker.start() 

    for worker in workers_list: 
     worker.join() 
     print "worker %s finished!" % worker.name 

if __name__ == '__main__': 
    result_list = [] 
    args = sys.argv 
    workers_num = int(args[1]) 
    ifilename = args[2] 
    run(workers_num, ifilename) 

ответ

8

Это происходит потому, что ничто в вашем коде принимает ничего offresult_queue. Затем поведение зависит от внутренних буферизаций очереди: если «не много» данных ожидает, все выглядит нормально, но если «много» данных ожидает, все зависает. Не намного больше, можно сказать, потому что она включает в себя слои внутренней магии ;-) Но документы действительно предупредить об этом:

Предупреждение

Как уже упоминалось выше, если дочерний процесс поставил элементы на очереди (и он не использовал JoinableQueue.cancel_join_thread), тогда этот процесс не завершится, пока все буферизованные элементы не будут сброшены в канал.

Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете получить тупик, если не уверены, что все предметы, которые были помещены в очередь, были уничтожены. Аналогично, если дочерний процесс не является демоническим, тогда родительский процесс может зависать при выходе, когда он пытается объединить всех своих не-демонических детей.

Обратите внимание, что в этой очереди, созданной с использованием диспетчера, нет этой проблемы. См. Руководство по программированию.

Один простой способ восстановить, что: Во-первых добавить

  result_queue.put(None) 

перед тем eat_queue() возвращается. Затем добавьте:

count = 0 
while count < workers_num: 
    if result_queue.get() is None: 
     count += 1 

перед основной программой .join() с рабочими. Это истощает очередь результатов, и тогда все прекращается.

Кстати, этот код довольно странно:

while True: 
    try: 
     job = job_queue.get(block=False) 
     if job == None: 
      print(proc_name + " DONE") 
      return 
     result_queue.put(execute(job)) 
    except Queue.Empty: 
     pass 

Почему вы делаете неблокирующая get()? Это превращается в «цикл занятости» CPU-hog, пока очередь пуста. Первичной точкой .get() является предоставление эффективного способа подождать, пока работа не появится.Таким образом:

while True: 
    job = job_queue.get() 
    if job is None: 
     print(proc_name + " DONE") 
     break 
    else: 
     result_queue.put(execute(job)) 
result_queue.put(None) 

делает то же самое, но гораздо эффективнее.

Queue размера предосторожность

Вы не спрашивали об этом, но давайте рассмотрим его, прежде чем он кусает вас ;-) По умолчанию нет ограничения на размере Queue «s. Если, например, вы добавите миллиард элементов в Queue, он потребует достаточного количества ОЗУ для хранения миллиарда элементов. Поэтому, если ваш продюсер (ы) может генерировать рабочие элементы быстрее, чем ваш потребитель (и) может их обработать, использование памяти может быстро выйти из-под контроля.

К счастью, это легко восстановить: укажите максимальный размер очереди. Например,

job_queue = mp.Queue(maxsize=10*workers_num) 
         ^^^^^^^^^^^^^^^^^^^^^^^ 

Затем job_queue.put(some_work_item) будет блокировать до тех пор, пока потребители уменьшить размер очереди до менее чем максимум. Таким образом, вы можете обрабатывать огромные проблемы с очередью, которая требует тривиальной ОЗУ.

+0

Спасибо! Теперь программа работает правильно. Спасибо за все предложения. (Я действительно прочитал эту часть документации во время работы с вводом и не понял этого в случае очереди вывода.) Почему я использовал 'blocks = false': я просто скомпилировал схему кода из различных фрагментов кода , Но я не уверен, что он на самом деле делает - я запутался (даже в документах в этом случае): если это «ложь», то ребенок, который должен прочитать ввод, не ждет следующего пункта? (Если так, что происходит?) – galapah

+1

'block = False' означает, что вызов немедленно возвращается, независимо от того, было ли что-либо в очереди. В общем случае игнорируйте аргументы «block» и «timeout». Все эти вещи прекрасно используются без них для подавляющего большинства задач, и использование их * может привести вас к различным неприятностям. Требуется много опыта, чтобы знать, когда вам нужно * неблокирующий или тайм-аут, и даже тогда большинство людей этого не делают ;-) –

Смежные вопросы