1

У меня есть пассивный инфракрасный датчик, и я хотел отключить и на моем дисплее, исходя из движения. Например. если в течение 5 минут нет движения, тогда дисплей должен отключиться для экономии энергии. Однако, если есть движение, не выключайте дисплей или не включайте его снова. (Не спрашивайте, почему для этого не подходит скринсейвер. У устройства, которое я создаю, не будет никакой клавиатуры или мыши. Это будет только отдельный дисплей.)Более элегантный способ проверки события/запуска периодически?

Моя идея состояла в том, чтобы создать два потока , производителя и потребителя. Нить производителя (датчик PIR) помещает сообщение в очередь, которое читает потребитель (который контролирует отображение). Таким образом, я могу отправлять сигналы от одного к другому.

У меня есть полностью функциональный код ниже (с некоторым объяснением), который завершает описанное ранее. Мой вопрос в том, что есть какой-то способ достичь этого более элегантным способом? Что вы думаете о моем подходе, все в порядке, хакер?

#!/usr/bin/env python 

import Queue 
from threading import Thread 
import RPi.GPIO as gpio 
import time 
import os 
import sys 

class PIRSensor: 

    # PIR sensor's states 
    current_state = 0 
    previous_state = 0 

    def __init__(self, pir_pin, timeout): 
     # PIR GPIO pin 
     self.pir_pin = pir_pin 
     # Timeout between motion detections 
     self.timeout = timeout 

    def setup(self): 
     gpio.setmode(gpio.BCM) 
     gpio.setup(self.pir_pin, gpio.IN) 
     # Wait for the PIR sensor to settle 
     # (loop until PIR output is 0) 
     while gpio.input(self.pir_pin) == 1: 
      self.current_state = 0 

    def report_motion(self, queue): 
     try: 
      self.setup() 

      while True: 
       self.current_state = gpio.input(self.pir_pin) 

       if self.current_state == 1 and self.previous_state == 0: 
        # PIR sensor is triggered 
        queue.put(True) 
        # Record previous state 
        self.previous_state = 1 
       elif self.current_state == 1 and self.previous_state == 1: 
        # Feed the queue since there is still motion 
        queue.put(True) 
       elif self.current_state == 0 and self.previous_state == 1: 
        # PIR sensor has returned to ready state 
        self.previous_state = 0 

       time.sleep(self.timeout) 
     except KeyboardInterrupt: 
      raise 

class DisplayControl: 

    # Display's status 
    display_on = True 

    def __init__(self, timeout): 
     self.timeout = timeout 

    def turn_off(self): 
     # Turn off the display 
     if self.display_on: 
      os.system("/opt/vc/bin/tvservice -o > /dev/null 2>&1") 
      self.display_on = False 

    def turn_on(self): 
     # Turn on the display 
     if not self.display_on: 
      os.system("{ /opt/vc/bin/tvservice -p && chvt 9 && chvt 7 ; } > /dev/null 2>&1") 
      self.display_on = True 

    def check_motion(self, queue): 
     try: 
      while True: 
       try: 
        motion = queue.get(True, self.timeout) 

        if motion: 
         self.turn_on() 
       except Queue.Empty: 
         self.turn_off() 
     except KeyboardInterrupt: 
      raise 

if __name__ == "__main__": 
    try: 
     pir_sensor = PIRSensor(7, 0.25) 
     display_control = DisplayControl(300) 
     queue = Queue.Queue() 

     producer = Thread(target=pir_sensor.report_motion, args=(queue,)) 
     consumer = Thread(target=display_control.check_motion, args=(queue,)) 

     producer.daemon = True 
     consumer.daemon = True 

     producer.start() 
     consumer.start() 

     while True: 
      time.sleep(0.1) 
    except KeyboardInterrupt: 
     display_control.turn_on() 
     # Reset GPIO settings 
     gpio.cleanup() 
     sys.exit(0) 

Производитель нить выполняет функцию (report_motion) из экземпляра класса PIRSensor. Класс PIRSensor считывает состояние пассивного инфракрасного датчика четыре раза в секунду, и всякий раз, когда он воспринимает движение, сообщение помещается в очередь.

Потребительский поток выполняет функцию (check_motion) экземпляра класса DisplayControl. Он считывает ранее упомянутую очередь в режиме блокировки с заданным таймаутом. Может произойти следующее:

  • Если дисплей включен, и нет никаких сообщений в очереди для данного времени, иначе тайм-аут истекает, потребитель нить дисплея выключение питания.
  • Если дисплей выключен и пришло сообщение, на дисплее будет подаваться питание .

ответ

1

Я думаю, что это хорошее решение. Причина в том, что вы разделили проблемы для разных классов. Один класс обрабатывает датчик PIR. Один обрабатывает дисплей. Сегодня вы склеиваете их вместе в очереди, это один из подходов.

Посредством этого вы можете легко протестировать различные классы.

Чтобы увеличить это (прочитайте, сделайте его возможным), вы можете ввести контроллер. Контроллер получает события (например, из очереди) и действует на события (например, сообщите диспетчеру дисплея об отключении дисплея). Контроллер знает о датчике и знает о дисплее. Но датчик не должен знать о дисплее или наоборот. (это очень похоже на MVC, где в этом случае данные являются моделью (датчиком), на дисплее отображается вид, а контроллер находится между ними.

Этот подход делает проект прочным, расширяемым, ремонтопригодным. вы не хаки, вы пишете реальный код.

2

Я думаю, что идея хорошая. Единственный вопрос, который я имею о вашей реализации, - это почему у потребителя и производителя в дочерних потоках? Вы могли бы просто сохранить потребителя в основном потоке, и тогда не было бы необходимости иметь этот бессмысленный цикл в вашем основном потоке.

while True: 
    time.sleep(0.1) 

который просто растрачивает циклы процессора. Вместо этого вы можете просто позвонить display_motion.check_motion(queue) напрямую.

Смежные вопросы