Я использую MQ в первый раз и пытаюсь внедрить систему регистрации с помощью RabbitMQ. Моя реализация включает в себя «отправитель»Потребитель не получает сообщение от MQ, когда сообщение отправляется до того, как потребитель слушает
/*
* This class sends messages over MQ
*/
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
}
channel.close();
connection.close();
}
}
который посылает одно сообщение для каждого из моих цветов к обмену, где цвет будет использоваться в качестве привязки. И это включает в себя «приемник»
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
receiveMessagesFromQueue(2);
}
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
который дается в качестве параметра число, представляющее какой цвет привязок я хотел бы, чтобы питаться от обмена.
В моей реализации и в RabbitMQ в общем, кажется, что сообщения хранятся в обмене, пока Consumer
не запросит их, и в этот момент они распределяются в соответствующие очереди, а затем отправляются по одному клиенту (или потребителя в MQ lingo). Моя проблема заключается в том, что когда я запускаю класс MQSender
перед запуском класса MQReceiver
, сообщения никогда не доставляются. Но когда я запускаю класс MQReceiver
, первые сообщения принимаются. Из моего понимания MQ я бы подумал, что сообщения должны храниться на сервере до тех пор, пока не будет запущен класс MQReceiver
, тогда сообщения должны быть доставлены их потребителям, однако это не то, что происходит. Мой главный вопрос заключается в том, могут ли эти сообщения храниться в обмене, а если нет, где они должны храниться так, чтобы они были доставлены после вызова потребителя (то есть моего класса MQReceiver
)?
Благодарим за помощь!
только предположение, но я подозреваю, что ваш 'Sender' является отбрасывая сообщение из-за отсутствия зарегистрированных 'Consumer's – StormeHawke
, возможно, у вас autoAck настроено на tru е? подробнее здесь: http://www.rabbitmq.com/tutorials/tutorial-two-java.html –
тоже это возможно? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –