FYI, я новичок с RabbitMQ.convertSendAndReceive не работает, когда очередь с лопатой
У меня есть этот случай использования для моего приложения, для которого я пытаюсь использовать RabbitMQ:
- Производитель посылает некоторое сообщение в очередь
- Потребительский процесс и отправить ответ
- Основываясь на Ответ Производитель действует на
Для вышеуказанного сценария я использовал convertSendAndReceive, который работает как шарм, когда производитель и потребитель находятся на одном сервере RabbitMQ. Но то же самое не работает, когда очередь перекошена.
Пожалуйста, дайте мне знать, если я использую неправильный метод/конфигурацию w.r.t RabbitMQ.
Заранее спасибо.
Добавление кода
Потребитель
public static void main(String[] args) throws InterruptedException {
ConnectionFactory cf = new CachingConnectionFactory("10.223.19.89");
// set up the queue, exchange, binding on the broker
RabbitAdmin admin = new RabbitAdmin(cf);
Queue queue = new Queue("myQueue");
Queue queueReply = new Queue("myQueue_reply");
admin.declareQueue(queue);
admin.declareQueue(queueReply);
TopicExchange exchange = new TopicExchange("myExchange");
admin.declareExchange(exchange);
admin.declareBinding(
BindingBuilder.bind(queue).to(exchange).with("foo.*"));
admin.declareBinding(
BindingBuilder.bind(queueReply).to(exchange).with("foo.*"));
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(cf);
Object listener = new Object() {
public String handleMessage(String foo) {
return foo + "test";
}
};
MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
container.setMessageListener(adapter);
container.setQueueNames("myQueue");
container.start();
}
Производитель
public void run()
{
Thread t = Thread.currentThread();
ConnectionFactory cf = new CachingConnectionFactory("10.223.19.93");
RabbitTemplate template = new RabbitTemplate(cf);
template.setExchange("myExchange");
template.setRoutingKey("foo.bar");
Queue queueReply = new Queue("myQueue_reply");
template.setReplyQueue(queueReply);
Object test = template.convertSendAndReceive("Hello world");
System.out.println(test.toString());
}
public static void main(String[] args) throws InterruptedException {
for(int i=0; i< 5; i++)
{
Thread t = new Thread(new SendReceiveThread());
t.setName("Thread # " + i);
t.start();
Thread.sleep(100);
}
}
Также обратите внимание, что я сконфигурировал заголовки fwd в Shovel –