2015-08-26 3 views
1

Я работаю с Python, RabbitMQ и Pika.RabbitMQ - Потребляйте несколько очередей

У меня есть несколько потребителей и очередей. Я хочу, чтобы один потребитель получал сообщения из каждой очереди. Предположим, у меня 2 потребителя и 3 очереди. Я хочу, чтобы каждый из моих потребителей, чтобы читать сообщения из очереди № 1, 2 и 3.

я смог сделать это с basic_get:

basic_get (queue1)

basic_get (queue2)

basic_get (queue3)

=> Получить одно сообщение из очереди 1, одно из очереди 2, одно из очереди 3, затем повторить.

«Проблема» в том, что я хочу использовать basic_consume, чтобы установить qos (пусть очереди вызывают n сообщений каждый раз). Я хочу получить n сообщений из очереди 1, затем n сообщений из очереди 2, n сообщений из очереди 3, а затем вернуться в очередь 1 и так далее. Я не хочу потреблять все из очереди 1, затем все со второго, ...

Я не нашел способ реализовать basic_consume для нескольких очередей. Возможно ли реализовать basic_consume в моей ситуации?

Кроме того, мне нужны потоки при использовании basic_consume? Он вызывает функцию, когда сообщение отправляется в RabbitMQ. Но это слушатель, как я понимаю. Поэтому я не могу ничего делать, пока он слушает. Использование потоков было бы полезно для обработки задач при прослушивании новых сообщений, не так ли?

Спасибо.

+0

Обратите внимание, что для обмена сообщениями Pika по потокам не существует потокобезопасности - каждая нить должна иметь свое собственное соединение: http://pika.readthedocs.io/en/latest/faq.html –

ответ

0

установить prefetch for the consumer, и вы должны быть в состоянии сделать это.

Я не уверен, о коде Пики конкретно, но пример в этой документации показывает это так:

channel.basicQos(10); // Per consumer limit 
channel.basicConsume("my-queue", false, consumer); 

относительно нитей ... это зависит от ваших конкретных потребностей приложения. часто бывает проще увеличить число экземпляров процессов вместо потоков. но использование потоков, безусловно, является правильным способом распараллеливать обработку сообщений в одном процессе.

Смежные вопросы