2015-04-06 5 views
1

Я даю Python (3.4) попытку с многопоточным чтением и задаю вопрос по следующему коду.Тупик при вызове task_done(), когда задача была вытащена новым потоком

Код работает хорошо, когда у меня больше работы, чем NUM_WORKER_THREADS; однако после того, как очередь сжимается до менее чем , новые итерации могут принимать один и тот же элемент из-за времени между items.get() и вызовом task_done. Это приводит к тупику при вызове task_done.

Каков правильный способ справиться с этим?

import time 
import threading 

from queue import Queue 
NUM_WORKER_THREADS = 8 

def worker(): 
    try: 
     while items.qsize() > 0: 
      print("{} items left to process".format(items.qsize())) 
      item = items.get()     
      print("Processing {}".format(item)) 
      itemrec = getItemRecord(item) # external call to webservice ~3 second response.    
      items.task_done() 

    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst) 


# start counter to monitor performance 
start = time.perf_counter() 

items = Queue() 
# get the items we need to work on for allocations 
searchResults = getSearchResults() # external call to webservice 

# add results of search to a collection 
for itemid in searchResults: 
    if itemid['recordtype'] == 'inventoryitem': 
     items.put(itemid['id']) 

for i in range(NUM_WORKER_THREADS): 
    try: 
     t = threading.Thread(target=worker) 
     t.daemon = True 
     t.start() 
    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst)    

items.join() 

# print end of execution performance counter 
print('time:', time.perf_counter() - start) 

ответ

2

Я хотел бы использовать часовой сказать рабочим, чтобы закрыть, когда нет больше рабочих элементов для обработки, а не полагаться на Queue размере, который является восприимчивым к условиям гонки:

import time 
import threading 

from queue import Queue 
NUM_WORKER_THREADS = 8 

def worker(): 
    for item in iter(items.get, None): 
     try: 
      print("{} items left to process".format(items.qsize())) 
      print("Processing {}".format(item)) 
     except Exception as inst: 
      print("---------------EXCEPTION OCCURRED----------------") 
      print(type(inst)) 
      print(inst.args) 
      print(inst) 
     finally: 
      items.task_done() 
    print("Got sentinel, shut down") 
    items.task_done() 

# start counter to monitor performance 
start = time.perf_counter() 

items = Queue() 
# get the items we need to work on for allocations 
searchResults = getSearchResults() # external call to webservice 

# add results of search to a collection 
for itemid in searchResults: 
    if itemid['recordtype'] == 'inventoryitem': 
     items.put(itemid['id']) 

for _ in range(NUM_WORKER_THREADS): 
    items.put(None) # Load a sentinel for each worker thread 

for i in range(NUM_WORKER_THREADS): 
    try: 
     t = threading.Thread(target=worker) 
     t.daemon = True 
     t.start() 
    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst)    

items.join() 

# print end of execution performance counter 
print('time:', time.perf_counter() - start) 

Также обратите внимание, что вы можете использовать встроенный в пул потоков, предоставленной Python (multiprocessing.dummy.Pool), чтобы сделать это более элегантно:

import time 
from multiprocessing.dummy import Pool # Thread Pool 

NUM_WORKER_THREADS = 8 

def worker(item): 
    try: 
     print("Processing {}".format(item)) 
     itemrec = getItemRecord(item) # external call to webservice ~3 second response.    
    except Exception as inst: 
     print("---------------EXCEPTION OCCURRED----------------") 
     print(type(inst)) 
     print(inst.args) 
     print(inst) 

# start counter to monitor performance 
start = time.perf_counter() 

# get the items we need to work on for allocations 
searchResults = getSearchResults() # external call to webservice 
pool = Pool(NUM_WORKER_THREADS) 
pool.map(worker, [item['id'] for item in searchResults 
        if item['recordtype'] == 'inventoryitem']) 
pool.close() 
pool.join() 


# print end of execution performance counter 
print('time:', time.perf_counter() - start) 
+0

Спасибо - я дам, что спина. –

Смежные вопросы