2015-01-28 2 views
1

У меня есть простой асинхронный потребитель для AMQP/RabbitMQ, написанный на Python с использованием библиотеки Pika и основанный на Asynchronous consumer example из документов Pika. Главное отличие состоит в том, что я хочу запустить мой в потоке, и я хочу, чтобы он правильно закрыл соединение, а затем выходил (т. Е. Завершал поток) через определенный промежуток времени. Вот мои методы, чтобы открыть соединение и установить тайм-аут. Я также открываю канал, создаю обмен и привязываю очередь ... все, что отлично работает.метод close() для адаптера Pica SelectConnection не закрывает соединение

def connect(self): 
    LOGGER.info('OPEN connection...') 
    return pika.SelectConnection(self._parameters, self.on_connection_open, stop_ioloop_on_close=False) 

def on_connection_open(self, unused_connection): 
    LOGGER.info('Connection opened') 
    self.add_on_connection_close_callback() 
    self._connection.add_timeout(5, self.timer_tick) 
    self.open_recv_channel() 

Вот тайм-аут обратного вызова:

def timer_tick(self): 
    LOGGER.info('---TICK---') 
    self._stop() 

Вот _stop метод:

def _stop(self): 
    LOGGER.info('Stopping...') 
    self._connection.close() 
    LOGGER.info('Stopped') 
    time.sleep(5) 
    self._connection.ioloop.stop() 

Вот метод запуска, который запускает нить:

def run(self): 
    print "-Run Started-" 
    self._connection = self.connect() 
    self._connection.ioloop.start() 
    print "-Run Finished-" 

Вот основной бит основного():

client = TestClient() 
client.start() 
client.join() 
LOGGER.info('Returned.') 
time.sleep(30) 

Моя проблема в том, что «self._connection.close()» не будет работать должным образом. Я добавил обратный вызов on_close:

self._connection.add_on_close_callback(self.on_connection_closed) 

Но on_connection_closed() никогда не вызывается. Кроме того, соединение НЕ закрыто. Я вижу это в веб-интерфейсе управления RabbitMQ, и он остается даже после завершения потока. Вот результат:

-Run Started- 
2015-01-28 14:39:28,431: OPEN connection... 
2015-01-28 14:39:28,491: Queue bound 
(...[snipped] various other messages here...) 
2015-01-28 14:39:28,491: Issuing consumer related RPC commands 
2015-01-28 14:39:28,491: Adding consumer cancellation callback 
(Pause here waiting for timeout callback) 
2015-01-28 14:39:33,505: ---TICK--- 
2015-01-28 14:39:33,505: Stopping... 
2015-01-28 14:39:33,505: Closing connection (200): Normal shutdown 
2015-01-28 14:39:33,505: Stopped 
-Run Finished- 
2015-01-28 14:39:39,507: Returned. 

«соединение Закрытия (200): Нормальное выключение» происходит от Пика, но ни один из моих ON_CLOSE или on_cancel обратных вызовов не называется, начинаю ли я, закрывая канал, или просто закрыть соединение. Единственное, что работает, - это остановить пользователя с помощью «basic_cancel», из-за чего вызывается мой «on_cancel_callback».

Я хочу использовать цикл в основной программе для создания и уничтожения потребительских потоков, но на данный момент каждый раз, когда я запускаю его, я заканчиваю сиротским соединением, поэтому количество соединений увеличивается на неопределенный срок. Соединения DO исчезают, когда программа закрывается.

Использование connection.close() должен работать: С Pika Docs:

близко (reply_code = 200, REPLY_TEXT = 'Нормальное выключение')

Отключиться от RabbitMQ. Если есть какие-либо открытые каналы, он попытается закрыть их до полного отключения. Каналы, в которых активные пользователи будут пытаться отправить Basic.Cancel в RabbitMQ, должны полностью прекратить доставку сообщений до закрытия канала.

+0

pika не является потокобезопасным, а соединения не должны использоваться разными потоками. –

+0

Горе! Это может быть так. Я не видел ничего о потоках в документах, но когда я искал, я обнаружил, что это ** [первая вещь в FAQ] (http://pika.readthedocs.org/en/latest/faq.html?highlight=thread) ** Возможно, вам стоит оставить свой комментарий в ответе? – Jeremy

+0

Я сделал, но не был уверен, что вы используете соединение. Это не было очевидно из вашего кода. –

ответ

1

Если вы используете соединение между вашими потоками, это может вызвать проблемы. pika не является потокобезопасным, а соединения не должны использоваться разными потоками.

First bit из FAQ:

Q:

Является ли Пика поточно?

A:

Пика не имеет никакого понятия многопоточности в коде. Если вы хотите использовать Pika с потоком, убедитесь, что у вас есть соединение Pika на поток, созданный в этой теме. Нельзя делиться одним соединением Pika по потокам.

+0

Это разрешило прерывистую проблему в одной версии моего тестового кода, где у меня был отдельный поток, который прослушивал сообщения, но разделял тот же объект соединения, что и поток, который отправлял сообщения. ОДНАКО, я еще раз взглянул на версию «SelectConnection», о которой я говорю в вопросе. Он открывает NEW CONNECTION для каждого потока, так что все должно быть в порядке. Следовательно, я должен был не принимать ваш ответ ... – Jeremy

+0

Я понял. Вот почему я начал с комментария. Тем не менее, люди, приезжающие сюда, могут быть заинтересованы. Майн взглянуть на «кролику» –

Смежные вопросы