Я не знаю точно, какую библиотеку вы используете, и когда вы говорите: «Это построение очереди ...» Я понятия не имею, о чем «это» вы имеете в виду ... но очевидным ответом является придерживаться ваш собственный queue перед тем, что он использует, поэтому вы можете напрямую манипулировать этой очередью. Например:
import queue
import threading
def skip_get(q):
value = q.get(block=True)
try:
while True:
value = q.get(block=False)
except queue.Empty:
return value
q = queue.Queue()
def file_event_callback(event):
# code 256 for adding file to folder
if event.mask == 256:
fileChanged = event.name
q.put(fileChanged)
def consumer():
while True:
fileChanged = skip_get(q)
if fileChanged is None:
return
# do stuff with fileChanged
Теперь, прежде чем запустить обозреватель, сделайте следующее:
t = threading.Thread(target=consumer)
t.start()
И в конце:
observer.join()
q.put(None)
t.join()
Итак, как это работает ?
Прежде всего, давайте посмотрим на сторону потребителя. Когда вы вызываете q.get()
, это выдает первое сообщение из очереди. Но что, если ничего не будет? Для этого нужен аргумент block
. Если это неверно, get
вызовет исключение queue.Empty
. Если это правда, get
будет ждать вечно (в потокобезопасном режиме), пока что-то не появится. Итак, блокируя один раз, мы обрабатываем случай, когда еще ничего не читать. К тому времени цикл без блокировки, мы потребляем что-нибудь еще в очереди, чтобы обрабатывать случай, когда слишком много вещей для чтения. Поскольку мы продолжаем переназначать value
тем, что мы выскочили, то, что мы заканчиваем, - это последнее, что помещено в очередь.
Теперь давайте посмотрим на сторону производителя. Когда вы звоните q.put(value)
, это просто помещает value
в очередь. Если вы не установили ограничение по размеру в очереди (чего у меня нет), это не может быть заблокировано, поэтому вам не нужно беспокоиться об этом. Но теперь, как вы сигнализируете поток потребителей, что вы закончили? Он будет ждать в q.get(block=True)
навсегда; единственный способ разбудить его - дать ему некоторую ценность поп-музыке. Путем нажатия на контрольное значение (в этом случае None
в порядке, так как оно недействительно в качестве имени файла), и заставляя потребителя обращаться с этим None
, прекратив работу, мы предоставим нам отличный, чистый способ отключения. (И потому, что мы никогда ничего не нажимаем после None
, нет никаких шансов случайно пропустить его.) Итак, мы можем просто нажать None
, а затем убедитесь, что (за исключением любых других ошибок) потребительский поток в конечном итоге уйдет, что означает, что мы можем сделать t.join()
подождать, пока он это сделает, не опасаясь тупика.
Я упомянул выше, что вы можете сделать это проще всего с Condition
.Если вы думаете о том, как работает очередь, это просто список (или deque или что-то еще), защищенное условием: потребитель ждет этого состояния, пока не появится что-то, и производитель сделает что-то доступным, добавив его в список и сигнализируя о состоянии. Если вы только хотите получить последнее значение, в списке нет причин. Таким образом, вы можете сделать это:
class OneQueue(object):
def __init__(self):
self.value = None
self.condition = threading.Condition()
self.sentinel = object()
def get(self):
with self.condition:
while self.value is None:
self.condition.wait()
value, self.value = self.value, None
return value
def put(self, value):
with self.condition:
self.value = value
self.condition.notify()
def close(self):
self.put(self.sentinel)
(Потому что я теперь с помощью None
сигнализировать, что ничего не доступно, мне пришлось создать отдельную часовой сигнал, что мы сделали.)
В проблема с этим дизайном заключается в том, что если производители ставят несколько значений, в то время как потребитель слишком занят, чтобы справиться с ними, он может пропустить некоторые из них, но в этом случае эта «проблема» именно то, что вы искали.
Тем не менее, использование инструментов нижнего уровня всегда означает, что есть намного больше, чтобы ошибиться, и это особенно опасно с синхронизацией потоков, поскольку это связано с трудностями, которые трудно обернуть вокруг, и трудно отлаживать, даже когда вы поймите их, так что вам может быть лучше использовать Queue
в любом случае.
Что это за 'fsevents'? Единственная библиотека, которую я знаю под этим именем, - это та, которая перестала работать надежно с OS X 10.5. Также есть привязки PyObjC и, по крайней мере, две другие автономные обертки (плюс различные кросс-платформенные вещи), но я не думаю, что любой из них называется просто 'fsevents'. – abarnert
Кроме того, «я сначала попытался использовать сторожевой таймер, но поскольку это нужно запустить на Mac» ... если вы имеете в виду [эту библиотеку] (https://pypi.python.org/pypi/watchdog/), в последний раз я смотрел, как это работает отлично на Mac (с использованием 'select.kqueue', родных' kqueue', PyObjC FSEvents или одной из других сторонних оберток, в зависимости от того, что вы установили). Какие проблемы у вас есть, и какие бэкэнд вы используете? – abarnert
Когда я пытался сделать сторожевую работу, я нашел эту страницу: https://pypi.python.org/pypi/MacFSEvents – Martin