2013-02-12 3 views
1

Я пишу помощники по домашней автоматизации - это в основном небольшие приложения типа python, подобные демонам. Они могут запускать каждый как отдельный процесс, но, поскольку будет сделано, я решил, что я поставил маленького диспетчера, который породит каждого из демонов в их собственных потоках и сможет действовать, и в будущем будет умирать нить.правильная потоковая обработка в python

Это то, что он выглядит (работает с двумя классами):

from daemons import mosquitto_daemon, gtalk_daemon 
from threading import Thread 

print('Starting daemons') 
mq_client = mosquitto_daemon.Client() 
gt_client = gtalk_daemon.Client() 

print('Starting MQ') 
mq = Thread(target=mq_client.run) 
mq.start() 

print('Starting GT') 
gt = Thread(target=gt_client.run) 
gt.start() 

while mq.isAlive() and gt.isAlive(): 
    pass 
print('something died') 

Проблема заключается в том, что MQ-демон (moquitto) будет работать нормально мне запустить его прямо:

mq_client = mosquitto_daemon.Client() 
mq_client.run() 

It запустится и повесится там, слушая все сообщения, которые попадают в соответствующие темы - именно то, что я ищу.

Однако запуск в диспетчере заставляет его действовать странно - он получит одно сообщение, а затем прекратит действовать, пока поток, как сообщается, не будет жив. Учитывая, что он отлично работает без резьбы woodoo, я предполагаю, что я делаю что-то неправильно в диспетчере.

Я цитирую код клиента MQ только в случае, если:

import mosquitto 
import config 
import sys 
import logging 


class Client(): 
    mc = None 

    def __init__(self): 
     logging.basicConfig(format=u'%(filename)s:%(lineno)d %(levelname)-8s [%(asctime)s] %(message)s', level=logging.DEBUG) 
     logging.debug('Class initialization...') 
     if not Client.mc: 
      logging.info('Creating an instance of MQ client...') 
      try: 
       Client.mc = mosquitto.Mosquitto(config.DEVICE_NAME) 
       Client.mc.connect(host=config.MQ_BROKER_ADDRESS) 
       logging.debug('Successfully created MQ client...') 
       logging.debug('Subscribing to topics...') 
       for topic in config.MQ_TOPICS: 
        result, some_number = Client.mc.subscribe(topic, 0) 
        if result == 0: 
         logging.debug('Subscription to topic "%s" successful' % topic) 
        else: 
         logging.error('Failed to subscribe to topic "%s": %s' % (topic, result)) 
       logging.debug('Settings up callbacks...') 
       self.mc.on_message = self.on_message 
       logging.info('Finished initialization') 
      except Exception as e: 
       logging.critical('Failed to complete creating MQ client: %s' % e.message) 
       self.mc = None 
     else: 
      logging.critical('Instance of MQ Client exists - passing...') 
      sys.exit(status=1) 

    def run(self): 
     self.mc.loop_forever() 

    def on_message(self, mosq, obj, msg): 
     print('meesage!!111') 
     logging.info('Message received on topic %s: %s' % (msg.topic, msg.payload)) 
+0

Кроме того, делая 'mc' атрибут класса, а не атрибут экземпляра может вызвать другие проблемы, если вы икру более одного экземпляра для каждого процесса (' 'Client.mc' против self.mc' и' тс = None' вместо того, чтобы просто устанавливать 'self.mc' на этапе' __init__'), в зависимости от того, как работает клиент-комаров с несколькими экземплярами. –

ответ

2

Вы передаете Threadrun метод другого класса экземпляра ... Это на самом деле не знает, что делать с ним.

threading.Thread может использоваться двумя способами: создать независимую функцию, завитанную нитью, или как базовый класс для класса с методом run. В вашем случае это похоже на базовый класс - это путь, так как ваш класс Client имеет метод run.

Заменить следующее в вашем MQ классе, и он должен работать:

from threading import Thread 

class Client(Thread): 
    mc = None 

    def __init__(self): 
     Thread.__init__(self) # initialize the Thread instance 
     ... 
    ... 

    def stop(self): 
     # some sort of command to stop mc 
     self.mc.stop() # not sure what the actual command is, if one exists at all... 

Тогда при вызове его, сделать это без Thread:

mq_client = mosquitto_daemon.Client() 
mq_client.start() 

print 'Print this line to be sure we get here after starting the thread loop...' 
+0

это действительно не помогло, я пытаюсь понять, почему. – abolotnov

+0

Я не могу действительно проверить этот код без вашего полного кода времени выполнения (или автономной усеченной версии). Возможно, вам также придется реализовать метод Stop, который распространяется на клиента mosquitto, а также с демоном gtalk. Я отредактировал свой ответ, чтобы добавить метод stop. –

+0

Я нашел пару опечаток в моем коде и преобразовал все в переменные экземпляра, но это не помогло. Демон Google Talk использует тот же класс Mosquitto, что и демоны подписчиков Mosquitto, и я предполагаю, что это вызывает некоторую путаницу. Я в конечном итоге отказался от игры с этим и преобразовал все, чтобы использовать RabbitMQ, и он отлично работает :) – abolotnov

1

Несколько вещей, чтобы рассмотреть следующие вопросы:

  1. zeromq ненавидит инициализацию в 1 потоке и запускается в другой. Вы можете переписать Client() как Thread, как предлагается, или написать свою собственную функцию, которая создаст Client и запустит эту функцию в потоке.

  2. Клиент() имеет переменную уровня mc. Я предполагаю, что mosquitto_daemon и gtalk_daemon используют один и тот же Клиент, и поэтому они находятся в споре, за которого побеждает Client.mc.

  3. «while mq.isAlive() и gt.isAlive(): pass» будет есть целый процессор, потому что он просто продолжает опрос снова и снова без сна. Учитывая, что python только квазипоточен (Global Interpreter Lock (GIL) позволяет запускать только один поток за один раз), это остановит ваши «демоны».

  4. Также, учитывая GIL, реализация демона orignal, скорее всего, будет работать лучше.

Смежные вопросы