2016-04-07 2 views
1

Вчера я разобрался с файлом журнала с примерно 20 миллионами строк с использованием многопроцессорности в python.Почему процесс не заканчивается в многопроцессе

  1. Запустите процесс с именем 'производитель', чтобы прочитать файл по строкам и поместить его в очередь.
  2. Запустите три процесса с именем «потребитель i», чтобы извлечь одну строку из очереди и проанализировать ее, чтобы получить ip.
  3. В основной функции я запускаю эти процессы и жду с помощью join().

код ниже

from multiprocessing import Process, Queue 
from Queue import Empty 
import os 
import time  


def put_ip(src, q, number): 
    """ 
    read file line by line, and put it to queue 
    """ 
    print "start put_ip: %d" % os.getpid() 
    with open(src) as f: 
     for line in f: 
      q.put(line) 
     for i in range(number): 
      q.put(EOFError) 
    print "stop put_ip" 


def get_ip(lock, src, result, index): 
    """ 
    fetch line, and extract ip from it 
    """ 
    print "start get_ip %d: %d" % (index, os.getpid()) 
    ips = [] 
    while True: 
     line = src.get() 
     if line == EOFError: 
      print "%d get EOFError" % index 
      break 
     else: 
      res = json.loads(line.strip()) 
      # process res, get ip 
      ips.append(ip) 
    print "get_ip %d get %d ips" % (os.getpid(), len(ips)) 
    result.put('\n'.join(ips)) 
    ips = [] 
    print "stop get_ip %d" % os.getpid() 
    return 


def test_get_ip(src, dest, number): 
    """ 
    test with single process 
    """ 
    srcq = Queue() 
    result = Queue() 

    with open(src) as f: 
     for line in f: 
      # if 'error' not in line: 
      srcq.put(line) 
     for i in range(number): 
      srcq.put(EOFError) 

    get_ip(srcq, result, 0) 


def main(src, dest, number): 
    """ 
    with multiprocess 
    """ 
    srcq = Queue() 
    result = Queue() 

    producer = Process(target=put_ip, args=(src, srcq, number)) 

    consumers = [Process(target=get_ip, args=(srcq, result, i)) for i in xrange(number)] 

    print 'start at %s' % time.asctime() 
    starttime = time.time() 

    producer.start() 
    for consumer in consumers: 
     consumer.start() 

    producer.join() 
    for consumer in consumers: 
     consumer.join() 

    with open(dest, 'w') as w: 
     while True: 
      try: 
       res = result.get_nowait() 
       w.write(res +'\n') 
      except Empty: 
       print 'Empty' 
       break 

    print "time: %f" % (time.time()-starttime) 

if __name__ == '__main__': 
    parser = argparse.ArgumentParser() 
    parser.add_argument('-i', dest='src', required=True) 
    parser.add_argument('-o', dest='dest', required=True) 
    parser.add_argument('-n', dest='number', type=int, default=2) 

    args = parser.parse_args() 

    main(args.src, args.dest, args.number) 
    # test_get_ip(args.src, args.dest, args.number) 

Результат является странным, что процесс потребителя не будет прекращен после проделанной работы, а основная функция блокируется на Join().

испытаний с различными suitations и перекодирует ниже:

  • использования test_get_ip() без многопроцессорных обрабатывать небольшой или большой файл журнала, он работает хорошо.
  • использовать main() с многопроцессорной обработкой для обработки большого лог-файла, он будет блокироваться при соединении(). каждый процесс get_ip будет печатать «stop get_ip XXXX», но не заканчивается.
  • использовать main() для обработки более мелкого лог-файла с 2000 рядами, он тоже хорошо работает. get_ip завершится.
  • Если i не храните ip в списке ips в get_ip(), он отлично работает с небольшим или большим файлом журнала.

Итак, в чем проблема? Имеет ли ограничение в списке? Я что-то пропустил?

Моя машина окружающая среда:

Linux 3.19.0-32-generiC#37~14.04.1-Ubuntu SMP Thu Oct 22 09:41:40 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux 
Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2 

Спасибо за вас время!

ответ

0

Я думаю, что ваша проблема в том, что вы буферизируете все данные и отправляете результат только тогда, когда потребители закончены.

Предположим, что процесс, выполняющийся get_ip(), собрал 1M IP-адреса в списке. Теперь, до завершения, необходимо до сериализовать все эти данные и передать их через Queue, а затем функция main() получит и deserialize все эти данные.

Мое предложение состоит в том, что вы сразу же помещаете IP-адреса в очередь результатов, а затем обрабатываете их и записываете их по мере их поступления.

+0

Как вы видите, в конце get_ip() я добавляю отладочную печать, ** печатаю «stop get_ip% d»% os.getpid() **. он будет отображаться на выходе. Я думаю ** result.put ** закончен. Правильно? Но почему процесс get_ip не заканчивается? –

+0

Возможно, они не будут завершены до тех пор, пока результаты не будут прочитаны из очереди. –

+0

Но почему с меньшим файлом журнала он заканчивается? Результаты считываются после прекращения всех потребительских процессов. –

Смежные вопросы