2014-09-11 2 views
2

Я использую SpringBoot для запуска приложения SpringAMQP, которое подключается к очередям RabbitMQ. Я хотел бы иметь возможность отправить сообщение от производителя, указав очередь ответа, чтобы потребитель мог отправить сообщение без необходимости расследовать пункт назначения (следовательно, не нужно передавать данные ответа в самом сообщении).Ответ на задание в SpringAMQP заранее задан?

это конфигурация Я (общий между производителем и потребителем)

private static final String QUEUE_NAME = "testQueue"; 
private static final String ROUTING_KEY = QUEUE_NAME; 
public static final String REPLY_QUEUE = "replyQueue"; 
private static final String USERNAME = "guest"; 
private static final String PASSWORD = "guest"; 
private static final String IP = "localhost"; 
private static final String VHOST = "/"; 
private static final int PORT = 5672; 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    amqpAdmin().declareQueue(new Queue(QUEUE_NAME)); 
    amqpAdmin().declareQueue(new Queue(REPLY_QUEUE)); 
    return template; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP); 
    connectionFactory.setUsername(USERNAME); 
    connectionFactory.setPassword(PASSWORD); 
    connectionFactory.setVirtualHost(VHOST); 
    connectionFactory.setPort(PORT); 
    return connectionFactory; 
} 

Я посылаю сообщение следующим образом:

public Object sendAndReply(String queue, String content){ 
     return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() { 

      @Override 
      public Message postProcessMessage(Message message) throws AmqpException { 
       message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE); 
       return message; 
      } 
     }); 
    } 

и ожидает ответа следующим образом:

public void replyToQueue(String queue){ 
    template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() { 
     @Override 
     public Data handle(Data payload) { 
      System.out.println("Received: "+payload.toString()); 
      return new Data("This is a reply for: "+payload.toString()); 
     } 
    }); 
} 

При отправке, однако, я получаю следующее исключение:

Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66) 
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663) 
    at prodsend.Prod.sendAndReply(ReplyTester.java:137) 
    at prodsend.ReplyTester.sendMessages(ReplyTester.java:49) 
    at prodsend.ReplyTester.main(ReplyTester.java:102) 
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. 
    at org.springframework.util.Assert.isNull(Assert.java:89) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835) 
    ... 8 more 

линия ReplyTest.137 указывает на линию return в методе sendAndReply выше.


EDIT: Вот класс данных, который упоминается выше :)

class Data{ 
    public String d; 
    public Data(String s){ d = s; } 
    public String toString() { return d; } 
} 

ответ

6

От documentation:

Базовая модель RPC. Отправьте сообщение на обмен по умолчанию с определенным ключом маршрутизации и попытайтесь получить ответ. Реализации обычно устанавливают заголовок ответа в эксклюзивную очередь и ждут некоторое время, ограниченное таймаутом.

Так метод convertSendAndReceive обрабатывает установки заголовка replyTo и возвращает Messaage - ответ. Это синхронный шаблон - RPC.

Если вы хотите сделать это асинхронно - что вам кажется - не используйте этот метод. Используйте соответствующий метод convertAndSend и используйте соответствующий MessagePostProcessor, чтобы добавить заголовок replyTo.

Поскольку это асинхронно, вам необходимо зарегистрировать обработчик для получения ответа. Это должно быть сделано до, отправив сообщение другой стороне. Затем этот обработчик будет вызываться в какой-то момент после отправки сообщения - когда неизвестно. Прочтите раздел 3.5.2 Асинхронный потребитель из Spring AQMP Documentation.

Таким образом, асинхронный поток процесса:

  1. отправитель регистрирует обработчик на replyTo queueue
  2. отправитель посылает сообщение с replyTo набор
  3. клиент вызывает receiveAndReply, обрабатывает сообщение и отправляет ответ на replyTo
  4. инициируется метод обратного вызова отправителя

Поток синхронного процесса:

  1. Отправитель посылает сообщение, используя sendAndReceive и блоки
  2. клиент вызывает receiveAndReply, обрабатывает сообщение и посылает ответ отправителя replyTo
  3. получает ответ, будит и процессы его

Так что последний случай требует от отправителя ждать. Поскольку вы используете receiveXXX вместо регистрации асинхронных обработчиков, отправитель может ждать очень долгое время, если клиент занимает некоторое время, чтобы добраться до звонка receiveXXX.

Кстати, если вы хотите использовать синхронный подход, но используете конкретный replyTo, вы всегда можете позвонить setReplyQueue. Существует также setReplyTimeout для случая, когда я упоминаю, где клиент либо не удосуживается прочитать сообщение, либо забывает ответить.

+0

Каким будет подход для асинхронного метода? – mangusbrother

+0

@mangusbrother обновленный ответ - это немного сложнее, чем это. Разве это не всегда? –