2010-04-16 2 views
2

Предположим, что я застрял с использованием Python 2.6 и не могу обновить (даже если это поможет). Я написал программу, которая использует класс Queue. Мой производитель - это простой список каталогов. Мои потребительские потоки вытаскивают файл из очереди и делают с ним все. Если файл уже обработан, я пропущу его. Обработанный список генерируется до начала всех потоков, поэтому он не пуст.Использование класса Queue в Python 2.6

Вот несколько псевдокодов.

import Queue, sys, threading 

processed = [] 

def consumer(): 
    while True: 
     file = dirlist.get(block=True) 
     if file in processed: 
      print "Ignoring %s" % file 
     else: 
      # do stuff here 
     dirlist.task_done() 

dirlist = Queue.Queue() 

for f in os.listdir("/some/dir"): 
    dirlist.put(f) 

max_threads = 8 

for i in range(max_threads): 
    thr = Thread(target=consumer) 
    thr.start() 

dirlist.join() 

странное поведение я получаю то, что если поток встречает файл, который уже был обработан, поток киосков, и ждет, пока вся программа не заканчивается. Я немного поработал над тестированием, и первые 7 потоков (при условии, что 8 является максимальным) останавливаются, а восьмой поток продолжает обрабатывать один файл за раз. Но, делая это, я теряю всю причину для потоковой передачи приложения.

Я что-то не так, или это ожидаемое поведение классов Queue/threading в Python 2.6?

+1

Должно быть что-то не так - как можно было бы ожидать, что остановка будет зависеть от теста, полностью не связанного с очередью ?!Но я не думаю, что это в этом коде, несмотря на его недостатки (потоки не-демона, злоупотребление встроенным именем 'file', ...) - я не думаю, что это может заставить замолчать потоки! Скорее, как обработано заполнено, и изменено ли оно в части «...» (что может быть проблемой, поскольку вокруг нет блокировки)? Можете ли вы воспроизвести эту проблему с тривиальной популяцией обработанных (например, поместите туда половину файлов) и тривиальным «...», например «print file»? –

ответ

1

Поскольку эта проблема проявляется только при поиске файла, который уже был обработан, похоже, что это как-то связано с самим списком processed. Вы пробовали реализовать простой замок? Например:

processed = [] 
processed_lock = threading.Lock() 

def consumer(): 
    while True: 
     with processed_lock.acquire(): 
      fileInList = file in processed 
     if fileInList: 
      # ... et cetera 

резьбы имеет тенденцию вызывать странные ошибки, даже если они кажутся, что они «не должны» случиться. Использование блокировок для общих переменных - это первый шаг, чтобы убедиться, что вы не закончите какое-то состояние гонки, которое может привести к тупику потоков.


Конечно, если то, что вы делаете под # do stuff here является ресурсоемким, то Python будет работать только код из одного потока в то время, в любом случае, из-за глобальным интерпретатор Lock. В этом случае вы можете переключиться на модуль multiprocessing - он очень похож на threading, хотя вам нужно будет заменить общие переменные другим решением (подробнее см. here).

+0

Я не думал о блокировке обработанного списка. Теперь, когда я думаю об этом, это определенно имеет смысл. И спасибо за рекомендацию использовать модуль многопроцессорности. – Pat

2

Я попытался запустить ваш код и не видел описанного вами поведения. Однако программа никогда не выходит. Я рекомендую изменения .get() вызова следующим образом:

try: 
     file = dirlist.get(True, 1) 
    except Queue.Empty: 
     return 

Если вы хотите знать, какой поток в настоящее время выполняется, вы можете импортировать thread модуль и печать thread.get_ident().

я добавил следующую строку после .get():

print file, thread.get_ident() 

и получили следующие результаты:

bin 7116328 
cygdrive 7116328 
cygwin.bat 7149424 
cygwin.ico 7116328 
dev etc7598568 
7149424 
fix 7331000 
home 7116328lib 
7598568sbin 
7149424Thumbs.db 
7331000 
tmp 7107008 
usr 7116328 
var 7598568proc 
7441800 

Выход грязно, поскольку потоки пишут на стандартный вывод в то же самое время. Многообразие идентификаторов потоков также подтверждает, что все потоки запущены.

Возможно, что-то не так в реальном коде или вашей методологии тестирования, но не в коде, который вы опубликовали?

+0

А, это объясняет, почему мне пришлось продолжать убивать программу. Спасибо за совет. – Pat

+0

@voipme Спасибо, хорошо. Активировать голосование лучше. :-) –

Смежные вопросы