2016-02-10 3 views
0

Я пытаюсь издеваться переменную, которая представляет состояние устройстваДразнящий переменной в Python

В этом случае я пытаюсь добавить устройство и у меня есть следующий код:

if self.network.controller.add_node(secure) : 
    for i in range(0, 60) : 
     if flagStarted : 
      if self.commandState == self.COMMAND_FAILED or self.commandState == self.COMMAND_FAILED : 
      # Transaction Failed or Error 
      self.network.controller.cancel_command() 
      self.log.warning(" *** Add Device Failed *** ") 
      return False 
     elif self.commandState == self.COMMAND_CANCEL : 
      # Transaction Canceled 
      self.log.debug(" *** Command Canceled ") 
      return False 
     elif self.commandState == self.COMMAND_COMPLETED : 
      # Transaction Completed 
      value = ZWaveProtocol.getAddedDevice() 
      if value > 0 : 
       dev = DeviceCollection.getDeviceByProtocolID(value, "ZWave") 
       return dev.id 
      else : 
       if self.commandState == self.COMMAND_STARTING or self.commandState == self.COMMAND_WAITING : 
       flagStarted = True 
     sys.stdout.write(".") 
     sys.stdout.flush() 
     time.sleep(1.0) 

    self.network.controller.cancel_command() 
    return -1 
else : 
    self.log.error("Failed to add device") 

Что я делаю, насмехается над self.network.controller.add_node (безопасно), и когда я это делаю, я меняю self.commandState на запуск .... То, что я хотел выполнить, примерно через 5 секунд изменило его на self.COMMAND_COMPLETED, чтобы успешно завершить операцию.

Любые идеи, как издеваться над этим?

ответ

1

Поскольку ваш код блокируется (программный поток остается в этом цикле до его завершения), опция без чрезмерного использования вашего текущего кода - это охват потока, который изменяет переменную по истечении заданного времени.

Предположим, что ваш код находится внутри метода с именем run_loop и внутри класса MyClass. С учетом простого тестового кода:

def test_1(): 
    obj = MyClass() 
    # Mock something 
    obj.run_loop() 
    # Do your assertions 

Вы можете изменить его на следующее. Я не тестировал его, и его можно многому усовершенствовать, но вы поняли:

from threading import Thread 
from time import sleep 

def change_state(obj): 
    sleep(5) 
    obj.commandState = obj.COMMAND_COMPLETE 

def test_1(): 
    obj = MyClass() 

    # Launch a thread that within 5 seconds will change the state of `obj` 
    Thread(target=change_state, args=[obj]).start() 

    # Call the main loop, which will recognize that the state changed within 5 secs 
    obj.run_loop() 
Смежные вопросы