У меня есть класс для потребительского клиента pika, основанный на примере кода pika для TornadoConnection. Я пытаюсь потреблять из очереди тем. Проблема заключается в том, что, поскольку соединение установлено асинхронным образом, мне неизвестно, когда будет установлен канал или когда очередь объявлена. Мой класс:Потребление нескольких тем с pika и TornadoConnection
class PikaClient(object):
""" Based on:
http://pika.readthedocs.org/en/latest/examples/tornado_consumer.html
https://reminiscential.wordpress.com/2012/04/07/realtime-notification-delivery-using-rabbitmq-tornado-and-websocket/
"""
def __init__(self, exchange, exchange_type):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self.exchange = exchange
self.exchange_type = exchange_type
self.queue = None
self.event_listeners = set([])
def connect(self):
logger.info('Connecting to RabbitMQ')
cred = pika.PlainCredentials('guest', 'guest')
param = pika.ConnectionParameters(
host='localhost',
port=5672,
virtual_host='/',
credentials=cred,
)
return pika.adapters.TornadoConnection(param,
on_open_callback=self.on_connection_open)
def close_connection(self):
logger.info('Closing connection')
self._connection.close()
def on_connection_closed(self, connection, reply_code, reply_text):
self._channel = None
if not self._closing:
logger.warning('Connection closed, reopening in 5 seconds: (%s) %s',
reply_code, reply_text)
self._connection.add_timeout(5, self.reconnect)
def on_connection_open(self, connection):
logger.info('Connected to RabbitMQ')
self._connection.add_on_close_callback(self.on_connection_closed)
self._connection.channel(self.on_channel_open)
def reconnect(self):
if not self._closing:
# Create a new connection
self._connection = self.connect()
def on_channel_closed(self, channel, reply_code, reply_text):
logger.warning('Channel %i was closed: (%s) %s',
channel, reply_code, reply_text)
self._connection.close()
def on_channel_open(self, channel):
logger.info('Channel open, declaring exchange')
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self._channel.exchange_declare(self.on_exchange_declareok,
self.exchange,
self.exchange_type,
passive=True,
)
def on_exchange_declareok(self, unused_frame):
logger.info('Exchange declared, declaring queue')
self._channel.queue_declare(self.on_queue_declareok,
exclusive=True,
auto_delete=True,
)
def on_queue_declareok(self, method_frame):
self.queue = method_frame.method.queue
def bind_key(self, routing_key):
logger.info('Binding %s to %s with %s',
self.exchange, self.queue, routing_key)
self._channel.queue_bind(self.on_bindok, self.queue,
self.exchange, routing_key)
def add_on_cancel_callback(self):
logger.info('Adding consumer cancellation callback')
self._channel.add_on_cancel_callback(self.on_consumer_cancelled)
def on_consumer_cancelled(self, method_frame):
logger.info('Consumer was cancelled remotely, shutting down: %r',
method_frame)
if self._channel:
self._channel.close()
def on_message(self, unused_channel, basic_deliver, properties, body):
logger.debug('Received message # %s from %s',
basic_deliver.delivery_tag, properties.app_id)
#self.notify_listeners(body)
def on_cancelok(self, unused_frame):
logger.info('RabbitMQ acknowledged the cancellation of the consumer')
self.close_channel()
def stop_consuming(self):
if self._channel:
logger.info('Sending a Basic.Cancel RPC command to RabbitMQ')
self._channel.basic_cancel(self.on_cancelok, self._consumer_tag)
def start_consuming(self):
logger.info('Issuing consumer related RPC commands')
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.on_message, no_ack=True)
def on_bindok(self, unused_frame):
logger.info('Queue bound')
self.start_consuming()
def close_channel(self):
logger.info('Closing the channel')
self._channel.close()
def open_channel(self):
logger.info('Creating a new channel')
self._connection.channel(on_open_callback=self.on_channel_open)
def run(self):
self._connection = self.connect()
def stop(self):
logger.info('Stopping')
self._closing = True
self.stop_consuming()
logger.info('Stopped')
Пример для кода с помощью его (внутри WebSocketHandler.open):
self.pc = PikaClient('agents', 'topic')
self.pc.run()
self.pc.bind_key('mytopic.*')
При попытке запуска этого bind_key бросает исключение, потому что _channel до сих пор никто. Но я не нашел способ блокировки до тех пор, пока не будут установлены канал и очередь. Есть ли способ сделать это с помощью динамического списка тем (который может измениться после запуска потребителя)?
Как бы вы изменили список тем после инициализации клиента? Я подумал о том, чтобы остановить клиента и начать новый экземпляр, но должен быть лучший способ. – noamk