У меня есть простой асинхронный потребитель для AMQP/RabbitMQ, написанный на Python с использованием библиотеки Pika и основанный на Asynchronous consumer example из документов Pika. Главное отличие состоит в том, что я хочу запустить мой в потоке, и я хочу, чтобы он правильно закрыл соединение, а затем выходил (т. Е. Завершал поток) через определенный промежуток времени. Вот мои методы, чтобы открыть соединение и установить тайм-аут. Я также открываю канал, создаю обмен и привязываю очередь ... все, что отлично работает.метод close() для адаптера Pica SelectConnection не закрывает соединение
def connect(self):
LOGGER.info('OPEN connection...')
return pika.SelectConnection(self._parameters, self.on_connection_open, stop_ioloop_on_close=False)
def on_connection_open(self, unused_connection):
LOGGER.info('Connection opened')
self.add_on_connection_close_callback()
self._connection.add_timeout(5, self.timer_tick)
self.open_recv_channel()
Вот тайм-аут обратного вызова:
def timer_tick(self):
LOGGER.info('---TICK---')
self._stop()
Вот _stop метод:
def _stop(self):
LOGGER.info('Stopping...')
self._connection.close()
LOGGER.info('Stopped')
time.sleep(5)
self._connection.ioloop.stop()
Вот метод запуска, который запускает нить:
def run(self):
print "-Run Started-"
self._connection = self.connect()
self._connection.ioloop.start()
print "-Run Finished-"
Вот основной бит основного():
client = TestClient()
client.start()
client.join()
LOGGER.info('Returned.')
time.sleep(30)
Моя проблема в том, что «self._connection.close()» не будет работать должным образом. Я добавил обратный вызов on_close:
self._connection.add_on_close_callback(self.on_connection_closed)
Но on_connection_closed() никогда не вызывается. Кроме того, соединение НЕ закрыто. Я вижу это в веб-интерфейсе управления RabbitMQ, и он остается даже после завершения потока. Вот результат:
-Run Started-
2015-01-28 14:39:28,431: OPEN connection...
2015-01-28 14:39:28,491: Queue bound
(...[snipped] various other messages here...)
2015-01-28 14:39:28,491: Issuing consumer related RPC commands
2015-01-28 14:39:28,491: Adding consumer cancellation callback
(Pause here waiting for timeout callback)
2015-01-28 14:39:33,505: ---TICK---
2015-01-28 14:39:33,505: Stopping...
2015-01-28 14:39:33,505: Closing connection (200): Normal shutdown
2015-01-28 14:39:33,505: Stopped
-Run Finished-
2015-01-28 14:39:39,507: Returned.
«соединение Закрытия (200): Нормальное выключение» происходит от Пика, но ни один из моих ON_CLOSE или on_cancel обратных вызовов не называется, начинаю ли я, закрывая канал, или просто закрыть соединение. Единственное, что работает, - это остановить пользователя с помощью «basic_cancel», из-за чего вызывается мой «on_cancel_callback».
Я хочу использовать цикл в основной программе для создания и уничтожения потребительских потоков, но на данный момент каждый раз, когда я запускаю его, я заканчиваю сиротским соединением, поэтому количество соединений увеличивается на неопределенный срок. Соединения DO исчезают, когда программа закрывается.
Использование connection.close() должен работать: С Pika Docs:
близко (reply_code = 200, REPLY_TEXT = 'Нормальное выключение')
Отключиться от RabbitMQ. Если есть какие-либо открытые каналы, он попытается закрыть их до полного отключения. Каналы, в которых активные пользователи будут пытаться отправить Basic.Cancel в RabbitMQ, должны полностью прекратить доставку сообщений до закрытия канала.
pika не является потокобезопасным, а соединения не должны использоваться разными потоками. –
Горе! Это может быть так. Я не видел ничего о потоках в документах, но когда я искал, я обнаружил, что это ** [первая вещь в FAQ] (http://pika.readthedocs.org/en/latest/faq.html?highlight=thread) ** Возможно, вам стоит оставить свой комментарий в ответе? – Jeremy
Я сделал, но не был уверен, что вы используете соединение. Это не было очевидно из вашего кода. –