2013-08-22 4 views
1

Я использую MQ в первый раз и пытаюсь внедрить систему регистрации с помощью RabbitMQ. Моя реализация включает в себя «отправитель»Потребитель не получает сообщение от MQ, когда сообщение отправляется до того, как потребитель слушает

/* 
* This class sends messages over MQ 
*/ 
public class MQSender { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     String[] levels = {"green", "orange", "red", "black"}; 
     for (String log_level : levels) { 
      String message = "This is a " + log_level + " message"; 
      System.out.println("Sending " + log_level + " message"); 
      //publish the message with each of the bindings in levels 
      channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes()); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

который посылает одно сообщение для каждого из моих цветов к обмену, где цвет будет использоваться в качестве привязки. И это включает в себя «приемник»

public class MQReceiver { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     receiveMessagesFromQueue(2); 
    } 

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     //generate random queue 
     String queueName = channel.queueDeclare().getQueue(); 

     //set bindings from 0 to maxLevel for the queue 
     for (int level = 0; level <= maxLevel; level++) { 
      channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]); 
     } 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queueName, true, consumer); 

     while(true) { 
      //waits until a message is delivered then gets that message 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      String routingKey = delivery.getEnvelope().getRoutingKey(); 

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); 
     } 
    } 
} 

который дается в качестве параметра число, представляющее какой цвет привязок я хотел бы, чтобы питаться от обмена.

В моей реализации и в RabbitMQ в общем, кажется, что сообщения хранятся в обмене, пока Consumer не запросит их, и в этот момент они распределяются в соответствующие очереди, а затем отправляются по одному клиенту (или потребителя в MQ lingo). Моя проблема заключается в том, что когда я запускаю класс MQSender перед запуском класса MQReceiver, сообщения никогда не доставляются. Но когда я запускаю класс MQReceiver, первые сообщения принимаются. Из моего понимания MQ я бы подумал, что сообщения должны храниться на сервере до тех пор, пока не будет запущен класс MQReceiver, тогда сообщения должны быть доставлены их потребителям, однако это не то, что происходит. Мой главный вопрос заключается в том, могут ли эти сообщения храниться в обмене, а если нет, где они должны храниться так, чтобы они были доставлены после вызова потребителя (то есть моего класса MQReceiver)?

Благодарим за помощь!

+0

только предположение, но я подозреваю, что ваш 'Sender' является отбрасывая сообщение из-за отсутствия зарегистрированных 'Consumer's – StormeHawke

+0

, возможно, у вас autoAck настроено на tru е? подробнее здесь: http://www.rabbitmq.com/tutorials/tutorial-two-java.html –

+0

тоже это возможно? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –

ответ

1

RabbitMQ отбрасывает сообщения, если их ключ маршрутизации не соответствует каким-либо очередям, связанным с обменом. При первом запуске MQSender очереди не привязаны, поэтому отправляемые сообщения теряются. Когда вы начинаете MQReceiver, он связывает очереди с биркой, поэтому у RabbitMQ есть место, чтобы поместить сообщение от MQSender. Когда вы останавливаете MQReceiver, поскольку вы создали анонимную очередь, очередь и все привязки удаляются из обмена.

Если вы хотите, чтобы на сервере хранились сообщения, а MQReceiver не работает, вам нужен приемник для создания именованной очереди и привязки ключей маршрутизации к этой очереди. Обратите внимание, что создание именованной очереди является idempotent, и очередь не будет создана, если она уже существует. Затем вам нужно, чтобы получатель вытащил сообщения из именованной очереди.

Изменить код выглядеть примерно так:

MQSender

.... 
String namedQueue = "logqueue"; 
//declare named queue and bind log level routing keys to it. 
//RabbitMQ will put messages with matching routing keys in this queue 
channel.queueDeclare(namedQueue, false, false, false, null); 
for (int level = 0; level < LOG_LEVELS.length; level++) { 
    channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]); 
} 
... 

MQReceiver

... 
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

QueueingConsumer consumer = new QueueingConsumer(channel); 

//Consume messages off named queue instead of anonymous queue 
String namedQueue = "logqueue"; 
channel.basicConsume(namedQueue, true, consumer); 

while(true) { 
...