2015-09-01 3 views
0

Я получаю, что мой basicQos игнорируется, когда потребитель падает, а после этого потребитель встает. Например, предположим, что потребитель не работает и 5 сообщений поступают от производителя. Если потребитель не работает, эти сообщения будут храниться на диске (я думаю!), Если обменник/очередь является долговременной.Предварительная выборка RabbitMQ игнорируется, когда потребитель падает и получает вверх

если я установил basicQos как channel.basicQos (0, 3, true), мой потребитель получает более 3 сообщений, когда он получает UP. Зачем?!?

С другой стороны, все работает правильно (только 3 сообщения считываются из очереди), если потребитель работает, когда он получает сообщения из очереди ... Мой код выглядит следующим образом:

factory = new ConnectionFactory(); 
factory.setHost(mRabbitMQHost); //may get server address from file configuration. 
factory.setUsername(mRabbitMQUsername); 
factory.setPassword(mRabbitMQPassword); 
connection = factory.newConnection(); 
channel = connection.createChannel(); 

channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability 
consumer = new QueueingConsumer(channel); 

for (QGQueues queue : QGQueues.values()) { 
    String queueName = queue.getQueueName(); 
    channel.queueDeclare(queueName, true, false, false, null); 
    channel.queueBind(queueName, "exchangeName", queue.getRoutingKey()); 
    channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server  
} 

channel.basicQos(0, 3, true); 

Спасибо !

+0

Возможно, я решил проблему. basicQos необходимо вызвать перед определением потребителей. –

ответ

1

Моя ставка будет заключаться в том, что вам необходимо установить QoS, прежде чем делать что-либо еще.

Изменить код этого заказа:



channel = connection.createChannel(); 

// set QoS immediately 
channel.basicQos(0, 3, true); 

channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability 
consumer = new QueueingConsumer(channel); 

for (QGQueues queue : QGQueues.values()) { 
    String queueName = queue.getQueueName(); 
    channel.queueDeclare(queueName, true, false, false, null); 
    channel.queueBind(queueName, "exchangeName", queue.getRoutingKey()); 
    channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server  
} 

это обеспечит предел упреждающей установлен, прежде чем пытаться потреблять любые сообщения.

+0

Спасибо, Дерик! Это решение! –

+0

отметьте как ответ, пожалуйста? ;) –

+0

Да, basicQos сообщает RabbitMQ, как потребитель ожидает получать сообщения, поэтому его нужно отправить перед basicConsume –

Смежные вопросы