2013-06-10 7 views
0

Пожалуйста, ознакомьтесь с основного примеромPython Сохранения процесса (закрытие обработчик файлов)

def queue_add(): 
    #this will exit when all lines from the 
    #file are added in the Queue 
    with open('get_from.txt') as f: 
     for line in f: 
      q.add(line.strip()) 

def queue_save(): 
    #when to trigger save_to.close()?!?! 
    save_to = open('save_to.txt') 
    while True: 
     data = q.get() #this functions blocks if `q` is empty, so how to know 
         #when to close the file handler `save_to`?? 
     save_to.write(data) 

def worker(): 
    #this is daemon process 
    while True: 
     #work with file from 
     get = q.get() 
     #work with data 
     q_done.put('processed data') 
     q.task_done() 

q = Queue() 
q_done = Queue() 
#starts the processes here.. 

Так что мой вопрос, как знать queue_save() обработали и сохранены все данные из done_q и закрыть его записи file_handler?

ответ

1

Как насчет использования дозорного значения.

import Queue 
import threading 

END_OF_DATA = object() # sentinel 

def queue_add(q): 
    with open('get_from.txt') as f: 
     for line in f: 
      q.put(line.strip()) 

def process(x): 
    # dummy 
    return str(len(x)) + '\n' 

def worker(q_in, q_out): 
    while True: 
     data = q_in.get() 
     if data is END_OF_DATA: 
      break 
     q_out.put(process(data)) 
     q_in.task_done() 

def queue_save(q): 
    save_to = open('save_to.txt', 'w') 
    while True: 
     data = q.get() 
     if data is END_OF_DATA: 
      break 
     save_to.write(data) 
     q.task_done() 
    save_to.close() 


q1 = Queue.Queue() 
q2 = Queue.Queue() 

n_workers = 4 
t1 = threading.Thread(target=queue_add, args=(q1,)) 
workers = [ 
    threading.Thread(target=worker, args=(q1, q2,)) 
    for i in range(n_workers) 
] 
t3 = threading.Thread(target=queue_save, args=(q2,)) 

t1.start() 
for worker in workers: worker.start() 
t3.start() 

t1.join() 

q1.join() 
for worker in workers: q1.put(END_OF_DATA) 

for worker in workers: worker.join() 

q2.join() 
q2.put(END_OF_DATA) 
t3.join() 

EDIT: синхронизировать нескольких работников.

+2

Если 'None' является допустимыми данными, вы должны использовать' END_OF_DATA = object() ', который гарантирует, что' data END_OF_DATA' является истинным тогда и только тогда, когда данные действительно являются дозорным значением. – Bakuriu

+0

'data = q_in.get()' data не может быть None, если в блоке Queue нет блоков, оставшихся в очереди Queue.get(). – nacholibre

+0

@nacholibre q_in.get() может быть None, потому что производитель (queue_add, worker) помещает None. – falsetru

Смежные вопросы