2012-05-21 6 views
1

Когда скорость queue.put() быстрее, чем скорость queue.get(), я считаю, что процесс P1 будет использовать большую память (поскольку P1 постоянно помещает строку из большого текстового файла) , Даже P2 закончил получать линию из очереди впоследствии. Память, которая использовалась P1, еще не выпущена. Как исправить эту проблему? Ниже приведен образец и тестовый код.Очередь многопроцессорности не освобождает память

Спасибо!

import time 
from multiprocessing import Process, Queue 

def addline(q): 
    f = file('a big text file','r') 
    line = True 
    while line: 
     line = f.readline() 
     q.put(line, False) 
    f.close() 
    print "P1:finished" 
    while 1: 
     time.sleep(2) 

def getline(q): 
    f = file('/tmp/bak','w') 
    line = True 
    while line: 
     line=q.get() 
     f.write(line) 
     time.sleep(0.01) 
    f.close() 
    print "p2:finished" 



if __name__ == "__main__": 
    q = Queue() 
    p1 = Process(name="addline", target=addline, args=(q,)) 
    p2 = Process(name="getline", target=getline, args=(q,)) 
    p1.start() 
    p2.start() 

Edit: Я пытаюсь прочитать текстовый файл (44MB) и наблюдать/Proc/PID/smaps. Я обнаружил, что память, которая не была выпущена, становится Private_Dirty в куче.

00fb3000-04643000 rw-p 00000000 00:00 0         [heap] 
Size:    55872 kB 
Rss:    55844 kB 
Pss:    55163 kB 
Shared_Clean:   0 kB 
Shared_Dirty:  1024 kB 
Private_Clean:   0 kB 
Private_Dirty:  54820 kB 
Referenced:  54972 kB 
Swap:     0 kB 
KernelPageSize:  4 kB 
MMUPageSize:   4 kB 

+0

Что вы делаете с 'while 1: q.get()' в 'getline()'? –

+0

В этом тестовом примере я просто хочу сохранить P2. Я удалил его. Благодарю. – jess

+1

Рассматривали ли вы установку лимита для своей очереди? Если процесс чтения не может идти в ногу, тогда без ограничения все содержимое должно буферизироваться в очереди. – mata

ответ

1

сборщик мусора Пайтона удаляет объект, как только на него не ссылаются больше. Пока скорость записи может идти в ногу со скоростью чтения вашего оборудования для хранения, чтение содержимого файла при одновременном написании его из двух независимых потоков/процессов должно быть возможным без увеличения объема памяти и небольшого объема памяти. Я считаю, что ваша проблема исчезает, когда вы используете языковые конструкции Python, которые более подходят для вашего варианта использования. Я попытаюсь прокомментировать это.

Для чтения файла строка за строкой, вы должны использовать следующую концепцию:

with open('filepath') as f: 
    for line in f: 
     do_something_with(line) 

Вы не должны явно .close() файл тогда. То же самое относится к написанию файла. Читайте о with заявлении здесь: http://effbot.org/zone/python-with-statement.htm

С моей точки зрения, для случая использования вы представили, multiprocessing.Pipe вместо multiprocessing.Queue будет более подходящим из-за «потоковый как» приложение. Кажется странным представлять содержимое сырого файла как элементы в очереди. Кроме того, вы можете избавиться от множества коммуникационных издержек, если вы будете использовать потоки вместо независимых подпроцессов (тогда вы должны использовать для межпоточной связи) **. В любом случае после запуска их необходимо выполнить join() потоки и подпроцессы.

** Для вашего использования (копирование файла) блокировка глобального интерпретатора (GIL) не будет проблемой производительности.

+0

Спасибо за ваш ответ !! В моем реальном случае я должен собрать много источников. Чтобы избежать проблемы с GIL, я использую многопроцессорность и многопроцессорность. Queue. Что касается чтения файла, я использовал генератор. После Queue.get P2 будет анализировать его, поэтому P2 работает медленнее, чем P1. И тогда P1 будет использовать память и не отпускать ее. Я не знаю, как это исправить. – jess

+0

Если анализ данных значительно медленнее, чем чтение файла, и вы не хотите иметь большой буфер (т. Е. Растущее использование памяти), нет другого выбора, кроме реализации механизма обратной связи: читателю требуется некоторое указание, когда нужно приостановить чтение данных , –

+0

В обычных условиях анализатор достаточно быстр, поэтому считыватель не использует память. Но, боюсь, есть некоторые исключения. И мне интересно, почему буфер не выпускается, даже читатель получил данные из буфера. Благодаря! – jess

Смежные вопросы