2011-12-28 3 views
2

Я пытаюсь установить что-то там, где один поток пишет список работ, а другой поток читает список и работает с ним. Этот список может быть очень большим, чтобы остановить этот список в памяти. Я хочу, чтобы он был написан в файле (или в любом случае - для сохранения генераторов памяти?).Работа в очереди, без использования большого количества памяти

Я собрал немного бегущего примера со сном в писателе, чтобы читатель мог догнать. Мне интересно, как я могу заставить читателя не останавливаться, когда он «настигает» писателя. Я посмотрел на использование .seek и .tell, но у меня появилось странное поведение, и я не уверен, что это правильный маршрут.

Как еще один вопрос, разве это вообще разумная идея? Возможно, есть намного более элегантный способ, которым я могу поставить в очередь список строк без использования большого количества памяти.

import threading,time 

class Writer(threading.Thread): 

    lock= threading.Lock() 

    def __init__(self,file_path,size): 
    threading.Thread.__init__(self) 
    self.file_path= file_path 
    self.size= size 
    self.i=0 

    def how_many(self): 
    with self.lock: 
     print "Reader starting, writer is on",self.i 

    def run(self): 
    f=open(self.file_path,"w") 
    for i in xrange(self.size): 
     with self.lock: 
     self.i=i 
     if i%1000==0: 
     time.sleep(0.1) 
     f.write("%s\n"%i) 
    f.close() 

class Reader(threading.Thread): 

    def __init__(self,file_path): 
    threading.Thread.__init__(self) 
    self.file_path= file_path 

    def run(self): 
    f=open(self.file_path,"r") 
    line=0 
    for line in f: 
     pass 
    print "Reader got to: %s"%line.strip() 


if __name__ == "__main__": 
    a= Writer("testfile",2000000) 
    b= Reader("testfile") 
    a.start() 
    time.sleep(1) 
    a.how_many() 
    b.start() 
+0

Вы заглянули в [трубы] (http://stackoverflow.com/questions/1430446/create-a-temporary-fifo-named-pipe-in-python)? –

+0

Выглядит многообещающе, но это UNIX только правильно? Мне нужно что-то независимое от платформы – GP89

+0

. Проверьте [здесь] (http://eli.thegreenplace.net/2011/12/27/python-threads-communication-and-stopping/) для примера рабочих потоков и очередей в python. Класс Queue.Queue обрабатывает кросс-платформу fifo. Возможно, вам придется подклассифицировать очередь для записи записей во временный файл для вашей памяти. –

ответ

2

я решить эту проблему, используя буферный-файл-очередь, где очередь распределяются между памятью и файлом. Элементы помещаются в очередь, но если элементы в очереди превышают указанный размер очереди, любое переполнение будет храниться в файле для сохранения памяти и будет get из очереди одинаково

Если кто-то хочет что-то сделать подобный я положил его на github here

0

Для отправки сообщений между потоками в Queue класс очень удобно. Импортируйте его с помощью from Queue import Queue, постройте его и передайте объект очереди каждому потоку. Он поддерживает несколько производителей и потребителей, и вы можете поместить большинство объектов Python в очередь - списки, объекты, итераторы и т. Д.

Для передачи большого количества данных с использованием этой очереди просто напишите один объект за раз в очередь и использовать функцию генератора у потребителя, которая выводит данные из очереди. Очереди поддерживают ограничение глубины, если производитель быстрее, чем потребитель.

+0

Я использую Очереди в других частях программы, под Очередью - только список, поэтому все, что я вставляю, все равно останется в памяти правильно? – GP89

+0

Если вы положили все сообщения в очередь перед чтением любого из них, то да. Но ваши потоки производителей и потребителей могут писать и читать одновременно, а класс «Queue» обрабатывает блокировку и другие проблемы. Кроме того, вы можете установить максимальную глубину, если производитель быстрее, чтобы ограничить использование памяти. – wberry

+0

Если несколько сообщений находятся в памяти в любой момент времени, все в порядке. Не стоит использовать диск в качестве очереди, чтобы сохранить несколько объектов. Только когда много и много из них сразу в памяти, у вас есть настоящая проблема. – wberry

0

Класс многопроцессорности JoinableQueue предназначен для ограничения объема отставания, которое может возникать при ожидании дочерних потоков/процессов для использования задач. Я собираюсь предположить, что вы читаете работу из файла, и что файл слишком велик, чтобы легко хранить в памяти все сразу.

Ниже приведена моя попытка решения, которое должно ограничивать использование памяти. В этом примере я обрабатываю ряд строк, завершающих новую строку, преобразовывая их в стандартный формат и записывая их обратно в новый файл.

Я ни в коем случае не специалист по многопроцессорному модулю, поэтому кто-нибудь видит ошибку/лучший способ сделать это, я хотел бы услышать это.

from multiprocessing import Process, Queue, JoinableQueue 
import time 

date_formats = [ 
    "%Y%m", 
    "%Y-%m-%d", 
    "%y-%m-%d", 
    "%y%m%d", 
    "%Y%m%d", 
    "%m/%d/%Y", 
    "%m/%d/%y", 
    "%m/%d/%Y %H:%M", 
    "%m%d%y", 
    "%m%d%Y", 
    "%B, %d %Y", 
    "%B, %d %y", 
    "%d %B, %Y", 
    "%d %B, %y", 
    "%B %d %Y", 
    "%B %d %y", 
    "%B %d, %Y", 
    "%B %d, %y", 
    "%B %d %Y", 
    "%B %d %y", 
    "%b %d %Y", 
    "%b %d, %Y", 
    "%b %d %y", 
    "%b %d, %y", 
    "%d-%b-%y", 
    "%Y-%m-%d %H:%M:%S" 
] 

def convert_date(date): 
    date = date.strip() 
    for dateformat in date_formats: 
     try: 
      converted = time.strptime(date, dateformat) 
      converted = time.strftime("%Y-%m-%d", converted) 
      return converted 
     except ValueError: 
      continue 

def writer(result_queue): 
    f = open("iso_dates.out", "wb") 
    while True: 
     try: 
      date = result_queue.get(timeout=1) 
      f.write(date + '\n') 
     except: 
      break  
    f.close() 

def worker(work_queue, result_queue): 
    while True: 
     date = work_queue.get() 

     if not date: 
      break 

     result_queue.put(convert_date(date)) 
     work_queue.task_done() 

dates  = open("dates.out", "rb") 
work_queue = JoinableQueue(512) #allow no more than 512 items on queue 
result_queue = Queue() 
writer_proc = Process(target=writer, args=(result_queue,)) 
worker_procs = 2 

for i in range(worker_procs): 
    p = Process(target=worker, args=(work_queue, result_queue)) 
    p.daemon = True 
    p.start() 

writer_proc.start() 
for date in dates: 
    work_queue.put(date) #will block until tasks are consumed if maxsize is encountered 

work_queue.join() 
dates.close() 
Смежные вопросы