2015-07-31 2 views
1

У меня проблема с потребителем RabbitMQ. На самом деле у меня есть один потребитель, получающий сообщения из трех очередей. Проблема в том, что мне нужно получить несколько сообщений от каждого из них, но мой потребитель получает только одну очередь и заканчивает получение. Буду признателен, если кто-то поможет мне решить эту проблему.RabbitMQ: несколько сообщений и один потребитель

Потребительское код ниже

 for (int i = 0; i < queueNames.size(); i++) { 

     Channel channel = connection.createChannel(); 
     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer); 

     flag = true; 
     while (flag) { 

      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String routingKey = delivery.getEnvelope().getRoutingKey(); 
      System.out.println(routingKey); 
      String message = new String(delivery.getBody(), "UTF-8"); 

       flag = false; 
     } 
    } 

где queueNames представляет собой список, содержащий имена моих очередей (в количестве 3).

ответ

0

ОК я решить проблему таким образом:

boolean flag; 
    System.out.println("Rozmiar queue " + queueNames.size()); 
    for (int i = 0; i < queueNames.size(); i++) { 

     Channel channel = connection.createChannel(); 
     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queueNames.get(i).toString(), true, consumer_tag, consumer); 

     flag = true; 
     while (flag) { 

      QueueingConsumer.Delivery delivery = consumer.nextDelivery(timeout); 
      if (delivery == null) { 
       flag = false; 
      } else { 

       String message = new String(delivery.getBody(), "UTF-8"); 
       System.out.println(" [x] Message Received '" + message + "'"); 
      } 
     } 
    } 

Я надеюсь, что это решение может помочь кому-то в будущем :)

1

Вы должны подписаться на очереди, потребитель будет потреблять только 1 сообщение так, как вы определили его

boolean autoAck = false; 
channel.basicConsume(queueName, autoAck, "myConsumerTag", 
new DefaultConsumer(channel) { 
    @Override 
    public void handleDelivery(String consumerTag, 
           Envelope envelope, 
           AMQP.BasicProperties properties, 
           byte[] body) 
     throws IOException 
    { 
     String routingKey = envelope.getRoutingKey(); 
     String contentType = properties.getContentType(); 
     long deliveryTag = envelope.getDeliveryTag(); 
     // (process the message components here ...) 
     channel.basicAck(deliveryTag, false); 
    } 
}); 

Больше информации здесь: https://www.rabbitmq.com/api-guide.html

+0

Когда я ма ke всего лишь while (true), все сообщения из одной очереди принимаются, но здесь возникает другая проблема. Потребитель не знает, есть ли еще сообщения и все еще ждет их. Я хотел бы пойти в другую очередь, если для меня больше не будет сообщений, которые будут получены из этого цикла. Любые предложения, как это сделать? – MrPug