2015-06-02 3 views
2

У меня есть класс для потребительского клиента pika, основанный на примере кода pika для TornadoConnection. Я пытаюсь потреблять из очереди тем. Проблема заключается в том, что, поскольку соединение установлено асинхронным образом, мне неизвестно, когда будет установлен канал или когда очередь объявлена. Мой класс:Потребление нескольких тем с pika и TornadoConnection

class PikaClient(object): 
    """ Based on: 
    http://pika.readthedocs.org/en/latest/examples/tornado_consumer.html 
    https://reminiscential.wordpress.com/2012/04/07/realtime-notification-delivery-using-rabbitmq-tornado-and-websocket/ 
    """ 

    def __init__(self, exchange, exchange_type): 
     self._connection = None 
     self._channel = None 
     self._closing = False 
     self._consumer_tag = None 

     self.exchange = exchange 
     self.exchange_type = exchange_type 
     self.queue = None 

     self.event_listeners = set([]) 

    def connect(self): 
     logger.info('Connecting to RabbitMQ') 

     cred = pika.PlainCredentials('guest', 'guest') 
     param = pika.ConnectionParameters(
      host='localhost', 
      port=5672, 
      virtual_host='/', 
      credentials=cred, 
     ) 

     return pika.adapters.TornadoConnection(param, 
      on_open_callback=self.on_connection_open) 

    def close_connection(self): 
     logger.info('Closing connection') 
     self._connection.close() 

    def on_connection_closed(self, connection, reply_code, reply_text): 
     self._channel = None 
     if not self._closing: 
      logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', 
          reply_code, reply_text) 
      self._connection.add_timeout(5, self.reconnect) 

    def on_connection_open(self, connection): 
     logger.info('Connected to RabbitMQ') 
     self._connection.add_on_close_callback(self.on_connection_closed) 
     self._connection.channel(self.on_channel_open) 

    def reconnect(self): 
     if not self._closing: 
      # Create a new connection 
      self._connection = self.connect() 

    def on_channel_closed(self, channel, reply_code, reply_text): 
     logger.warning('Channel %i was closed: (%s) %s', 
         channel, reply_code, reply_text) 
     self._connection.close() 

    def on_channel_open(self, channel): 
     logger.info('Channel open, declaring exchange') 
     self._channel = channel 
     self._channel.add_on_close_callback(self.on_channel_closed) 
     self._channel.exchange_declare(self.on_exchange_declareok, 
             self.exchange, 
             self.exchange_type, 
             passive=True, 
             ) 

    def on_exchange_declareok(self, unused_frame): 
     logger.info('Exchange declared, declaring queue') 
     self._channel.queue_declare(self.on_queue_declareok, 
            exclusive=True, 
            auto_delete=True, 
            ) 

    def on_queue_declareok(self, method_frame): 
     self.queue = method_frame.method.queue 

    def bind_key(self, routing_key): 
     logger.info('Binding %s to %s with %s', 
        self.exchange, self.queue, routing_key) 
     self._channel.queue_bind(self.on_bindok, self.queue, 
           self.exchange, routing_key) 

    def add_on_cancel_callback(self): 
     logger.info('Adding consumer cancellation callback') 
     self._channel.add_on_cancel_callback(self.on_consumer_cancelled) 

    def on_consumer_cancelled(self, method_frame): 
     logger.info('Consumer was cancelled remotely, shutting down: %r', 
        method_frame) 
     if self._channel: 
      self._channel.close() 

    def on_message(self, unused_channel, basic_deliver, properties, body): 
     logger.debug('Received message # %s from %s', 
        basic_deliver.delivery_tag, properties.app_id) 
     #self.notify_listeners(body) 

    def on_cancelok(self, unused_frame): 
     logger.info('RabbitMQ acknowledged the cancellation of the consumer') 
     self.close_channel() 

    def stop_consuming(self): 
     if self._channel: 
      logger.info('Sending a Basic.Cancel RPC command to RabbitMQ') 
      self._channel.basic_cancel(self.on_cancelok, self._consumer_tag) 

    def start_consuming(self): 
     logger.info('Issuing consumer related RPC commands') 
     self.add_on_cancel_callback() 
     self._consumer_tag = self._channel.basic_consume(self.on_message, no_ack=True) 

    def on_bindok(self, unused_frame): 
     logger.info('Queue bound') 
     self.start_consuming() 

    def close_channel(self): 
     logger.info('Closing the channel') 
     self._channel.close() 

    def open_channel(self): 
     logger.info('Creating a new channel') 
     self._connection.channel(on_open_callback=self.on_channel_open) 

    def run(self): 
     self._connection = self.connect() 

    def stop(self): 
     logger.info('Stopping') 
     self._closing = True 
     self.stop_consuming() 
     logger.info('Stopped') 

Пример для кода с помощью его (внутри WebSocketHandler.open):

self.pc = PikaClient('agents', 'topic') 
self.pc.run() 
self.pc.bind_key('mytopic.*') 

При попытке запуска этого bind_key бросает исключение, потому что _channel до сих пор никто. Но я не нашел способ блокировки до тех пор, пока не будут установлены канал и очередь. Есть ли способ сделать это с помощью динамического списка тем (который может измениться после запуска потребителя)?

ответ

0

У вас действительно есть способ узнать, когда очередь установлена ​​- метод on_queue_declareok(). Этот обратный вызов будет выполняться после того, как метод self.queue_declare закончил, и self.queue_declare выполняется один раз _channel.exchance_declare закончил, и т.д. Вы можете проследить цепочку весь путь обратно к методу выполнения:

run ->connect ->on_connection_open ->_connection.channel ->on_channel_open ->_channel.exchange_declare ->on_exchange_declareok ->_channel.queue_declare ->on_queue_declareok

Таким образом, вы просто добавить вызов (ей) bind_key к on_queue_declareok, и что будет инициировать вызов on_bindok, который будет вызывать start_consuming. В этот момент ваш клиент фактически прослушивает сообщения. Если вы хотите иметь возможность динамически предоставлять темы, просто возьмите их в конструкторе PikaClient. Затем вы можете позвонить по телефону bind_key с каждой стороны on_queue_declareok. Вам также необходимо добавить флаг, указывающий, что вы уже начали потреблять, поэтому вы не пытаетесь сделать это дважды.

Нечто подобное (предположим, что все методы, не показанные ниже остаются теми же):

def __init__(self, exchange, exchange_type, topics=None): 
    self._topics = [] if topics is None else topics 
    self._connection = None 
    self._channel = None 
    self._closing = False 
    self._consumer_tag = None 
    self._consuming = False 

    self.exchange = exchange 
    self.exchange_type = exchange_type 
    self.queue = None 

    self.event_listeners = set([]) 

def on_queue_declareok(self, method_frame): 
    self.queue = method_frame.method.queue 
    for topic in self._topics: 
     self.bind_key(topic) 

def start_consuming(self): 
    if self._consuming: 
     return 
    logger.info('Issuing consumer related RPC commands') 
    self.add_on_cancel_callback() 
    self._consumer_tag = self._channel.basic_consume(self.on_message, no_ack=True) 
    self._consuming = True 
+0

Как бы вы изменили список тем после инициализации клиента? Я подумал о том, чтобы остановить клиента и начать новый экземпляр, но должен быть лучший способ. – noamk

Смежные вопросы