2014-09-23 2 views
1

В настоящее время я отслеживаю папку с использованием fsevents. Каждый раз, когда файл добавляется, в этом файле выполняется код. Новый файл добавляется в папку каждую секунду.Пропустить шаги в очереди fsevents

from fsevents import Observer, Stream 

def file_event_callback(event): 
    # code 256 for adding file to folder 
    if event.mask == 256: 
     fileChanged = event.name 
     # do stuff with fileChanged file 

if __name__ == "__main__": 
    observer = Observer() 
    observer.start() 
    stream = Stream(file_event_callback, 'folder', file_events=True) 
    observer.schedule(stream) 
    observer.join() 

Это работает очень хорошо. Единственная проблема заключается в том, что libary создает очередь для каждого файла, добавленного в папку. Код, выполняемый в файле file_event_callback, может занимать больше секунды. Когда это происходит, другие объекты в очереди должны быть пропущены так, чтобы использовался только самый новый.

Как я могу пропустить элементы из очереди, чтобы завершить только последнее дополнение к папке, используемой после последней?

Сначала я попытался использовать сторожевой таймер, но, поскольку это должно работать на mac, у меня были проблемы, которые делали это так, как я хотел.

+0

Что это за 'fsevents'? Единственная библиотека, которую я знаю под этим именем, - это та, которая перестала работать надежно с OS X 10.5. Также есть привязки PyObjC и, по крайней мере, две другие автономные обертки (плюс различные кросс-платформенные вещи), но я не думаю, что любой из них называется просто 'fsevents'. – abarnert

+0

Кроме того, «я сначала попытался использовать сторожевой таймер, но поскольку это нужно запустить на Mac» ... если вы имеете в виду [эту библиотеку] (https://pypi.python.org/pypi/watchdog/), в последний раз я смотрел, как это работает отлично на Mac (с использованием 'select.kqueue', родных' kqueue', PyObjC FSEvents или одной из других сторонних оберток, в зависимости от того, что вы установили). Какие проблемы у вас есть, и какие бэкэнд вы используете? – abarnert

+0

Когда я пытался сделать сторожевую работу, я нашел эту страницу: https://pypi.python.org/pypi/MacFSEvents – Martin

ответ

0

Я не знаю точно, какую библиотеку вы используете, и когда вы говорите: «Это построение очереди ...» Я понятия не имею, о чем «это» вы имеете в виду ... но очевидным ответом является придерживаться ваш собственный queue перед тем, что он использует, поэтому вы можете напрямую манипулировать этой очередью. Например:

import queue 
import threading 

def skip_get(q): 
    value = q.get(block=True) 
    try: 
     while True: 
      value = q.get(block=False) 
    except queue.Empty: 
     return value 

q = queue.Queue() 

def file_event_callback(event): 
    # code 256 for adding file to folder 
    if event.mask == 256: 
     fileChanged = event.name 
     q.put(fileChanged) 

def consumer(): 
    while True: 
     fileChanged = skip_get(q) 
     if fileChanged is None: 
      return 
     # do stuff with fileChanged 

Теперь, прежде чем запустить обозреватель, сделайте следующее:

t = threading.Thread(target=consumer) 
t.start() 

И в конце:

observer.join() 
q.put(None) 
t.join() 

Итак, как это работает ?

Прежде всего, давайте посмотрим на сторону потребителя. Когда вы вызываете q.get(), это выдает первое сообщение из очереди. Но что, если ничего не будет? Для этого нужен аргумент block. Если это неверно, get вызовет исключение queue.Empty. Если это правда, get будет ждать вечно (в потокобезопасном режиме), пока что-то не появится. Итак, блокируя один раз, мы обрабатываем случай, когда еще ничего не читать. К тому времени цикл без блокировки, мы потребляем что-нибудь еще в очереди, чтобы обрабатывать случай, когда слишком много вещей для чтения. Поскольку мы продолжаем переназначать value тем, что мы выскочили, то, что мы заканчиваем, - это последнее, что помещено в очередь.

Теперь давайте посмотрим на сторону производителя. Когда вы звоните q.put(value), это просто помещает value в очередь. Если вы не установили ограничение по размеру в очереди (чего у меня нет), это не может быть заблокировано, поэтому вам не нужно беспокоиться об этом. Но теперь, как вы сигнализируете поток потребителей, что вы закончили? Он будет ждать в q.get(block=True) навсегда; единственный способ разбудить его - дать ему некоторую ценность поп-музыке. Путем нажатия на контрольное значение (в этом случае None в порядке, так как оно недействительно в качестве имени файла), и заставляя потребителя обращаться с этим None, прекратив работу, мы предоставим нам отличный, чистый способ отключения. (И потому, что мы никогда ничего не нажимаем после None, нет никаких шансов случайно пропустить его.) Итак, мы можем просто нажать None, а затем убедитесь, что (за исключением любых других ошибок) потребительский поток в конечном итоге уйдет, что означает, что мы можем сделать t.join() подождать, пока он это сделает, не опасаясь тупика.


Я упомянул выше, что вы можете сделать это проще всего с Condition.Если вы думаете о том, как работает очередь, это просто список (или deque или что-то еще), защищенное условием: потребитель ждет этого состояния, пока не появится что-то, и производитель сделает что-то доступным, добавив его в список и сигнализируя о состоянии. Если вы только хотите получить последнее значение, в списке нет причин. Таким образом, вы можете сделать это:

class OneQueue(object): 
    def __init__(self): 
     self.value = None 
     self.condition = threading.Condition() 
     self.sentinel = object() 
    def get(self): 
     with self.condition: 
      while self.value is None: 
       self.condition.wait() 
      value, self.value = self.value, None 
      return value 
    def put(self, value): 
     with self.condition: 
      self.value = value 
      self.condition.notify() 
    def close(self): 
     self.put(self.sentinel) 

(Потому что я теперь с помощью None сигнализировать, что ничего не доступно, мне пришлось создать отдельную часовой сигнал, что мы сделали.)

В проблема с этим дизайном заключается в том, что если производители ставят несколько значений, в то время как потребитель слишком занят, чтобы справиться с ними, он может пропустить некоторые из них, но в этом случае эта «проблема» именно то, что вы искали.

Тем не менее, использование инструментов нижнего уровня всегда означает, что есть намного больше, чтобы ошибиться, и это особенно опасно с синхронизацией потоков, поскольку это связано с трудностями, которые трудно обернуть вокруг, и трудно отлаживать, даже когда вы поймите их, так что вам может быть лучше использовать Queue в любом случае.

+0

Хорошо, дайте ему попробовать, спасибо вам, – Martin

+0

@Martin: Хорошо, вы понимаете, что я здесь делаю? Вы могли бы упростить это довольно легко, просто используя одно изменяемое значение с условием «Условие»; Я использовал только очередь, потому что вы сказали, что хотите пропустить очередь, поэтому я решил, что вы поймете это ... – abarnert

+0

Я довольно новичок в программировании, и я пытаюсь получить эту концепцию. Я просто протестировал его, и у меня все еще есть проблема, что, когда я добавляю три файла в папку, пока программа все еще работает над первой, все остальные выполняются после того, как первая будет выполнена. Как я могу предотвратить эту форму? Я хочу, чтобы последний файл был добавлен в используемую папку и забыл о других, добавленных, пока код все еще работал над первым. – Martin

Смежные вопросы