Вчера я разобрался с файлом журнала с примерно 20 миллионами строк с использованием многопроцессорности в python.Почему процесс не заканчивается в многопроцессе
- Запустите процесс с именем 'производитель', чтобы прочитать файл по строкам и поместить его в очередь.
- Запустите три процесса с именем «потребитель i», чтобы извлечь одну строку из очереди и проанализировать ее, чтобы получить ip.
- В основной функции я запускаю эти процессы и жду с помощью join().
код ниже
from multiprocessing import Process, Queue
from Queue import Empty
import os
import time
def put_ip(src, q, number):
"""
read file line by line, and put it to queue
"""
print "start put_ip: %d" % os.getpid()
with open(src) as f:
for line in f:
q.put(line)
for i in range(number):
q.put(EOFError)
print "stop put_ip"
def get_ip(lock, src, result, index):
"""
fetch line, and extract ip from it
"""
print "start get_ip %d: %d" % (index, os.getpid())
ips = []
while True:
line = src.get()
if line == EOFError:
print "%d get EOFError" % index
break
else:
res = json.loads(line.strip())
# process res, get ip
ips.append(ip)
print "get_ip %d get %d ips" % (os.getpid(), len(ips))
result.put('\n'.join(ips))
ips = []
print "stop get_ip %d" % os.getpid()
return
def test_get_ip(src, dest, number):
"""
test with single process
"""
srcq = Queue()
result = Queue()
with open(src) as f:
for line in f:
# if 'error' not in line:
srcq.put(line)
for i in range(number):
srcq.put(EOFError)
get_ip(srcq, result, 0)
def main(src, dest, number):
"""
with multiprocess
"""
srcq = Queue()
result = Queue()
producer = Process(target=put_ip, args=(src, srcq, number))
consumers = [Process(target=get_ip, args=(srcq, result, i)) for i in xrange(number)]
print 'start at %s' % time.asctime()
starttime = time.time()
producer.start()
for consumer in consumers:
consumer.start()
producer.join()
for consumer in consumers:
consumer.join()
with open(dest, 'w') as w:
while True:
try:
res = result.get_nowait()
w.write(res +'\n')
except Empty:
print 'Empty'
break
print "time: %f" % (time.time()-starttime)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-i', dest='src', required=True)
parser.add_argument('-o', dest='dest', required=True)
parser.add_argument('-n', dest='number', type=int, default=2)
args = parser.parse_args()
main(args.src, args.dest, args.number)
# test_get_ip(args.src, args.dest, args.number)
Результат является странным, что процесс потребителя не будет прекращен после проделанной работы, а основная функция блокируется на Join().
испытаний с различными suitations и перекодирует ниже:
- использования test_get_ip() без многопроцессорных обрабатывать небольшой или большой файл журнала, он работает хорошо.
- использовать main() с многопроцессорной обработкой для обработки большого лог-файла, он будет блокироваться при соединении(). каждый процесс get_ip будет печатать «stop get_ip XXXX», но не заканчивается.
- использовать main() для обработки более мелкого лог-файла с 2000 рядами, он тоже хорошо работает. get_ip завершится.
- Если i не храните ip в списке ips в get_ip(), он отлично работает с небольшим или большим файлом журнала.
Итак, в чем проблема? Имеет ли ограничение в списке? Я что-то пропустил?
Моя машина окружающая среда:
Linux 3.19.0-32-generiC#37~14.04.1-Ubuntu SMP Thu Oct 22 09:41:40 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Python 2.7.6 (default, Jun 22 2015, 17:58:13)
[GCC 4.8.2] on linux2
Спасибо за вас время!
Как вы видите, в конце get_ip() я добавляю отладочную печать, ** печатаю «stop get_ip% d»% os.getpid() **. он будет отображаться на выходе. Я думаю ** result.put ** закончен. Правильно? Но почему процесс get_ip не заканчивается? –
Возможно, они не будут завершены до тех пор, пока результаты не будут прочитаны из очереди. –
Но почему с меньшим файлом журнала он заканчивается? Результаты считываются после прекращения всех потребительских процессов. –