1

Я пытаюсь реализовать конфигурацию RabbitMQ, которая позволит мне использовать фиксированную очередь ответов вместо того, чтобы появляться сотни очередей темпов. Мое первое сообщение, которое публикуется, получает немедленный ответ через очередь ответа, второе, третье, а иногда даже пятое сообщение, просто дает мне стекную строку Reply received after timeout. Если я немного подожду и отправлю другое сообщение, я снова получаю ответ с немедленными очередными сообщениями с той же ошибкой.Исправлено время ожидания очереди ответа после второго сообщения

На стороне издателя, у меня есть следующая конфигурация:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> 
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/> 
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/> 
</bean> 
<rabbit:connection-factory 
     id="connectionFactory" 
     port="${rabbit.port}" 
     virtual-host="${rabbit.virtual}" 
     host="${rabbit.host}" 
     username="${rabbit.username}" 
     password="${rabbit.password}" 
     connection-factory="nativeConnectionFactory"/> 
<rabbit:admin connection-factory="connectionFactory"/> 
<rabbit:template 
     id="amqpTemplate" 
     connection-factory="connectionFactory" 
     reply-timeout="${rabbit.rpc.timeout}" 
     reply-queue="reply"> 
    <rabbit:reply-listener /> 
</rabbit:template> 

<rabbit:queue id="reply" name="reply" /> 

На стороне потребителя я имею следующую конфигурацию:

<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> 
    <property name="connectionTimeout" value="${rabbit.connection.timeout}"/> 
    <property name="requestedHeartbeat" value="${rabbit.heartbeat}"/> 
</bean> 
<rabbit:connection-factory 
     id="connectionFactory" 
     port="${rabbit.port}" 
     virtual-host="${rabbit.virtual}" 
     host="${rabbit.host}" 
     username="${rabbit.username}" 
     password="${rabbit.password}" 
     connection-factory="nativeConnectionFactory"/> 
<rabbit:admin connection-factory="connectionFactory"/> 
<rabbit:template 
     id="amqpTemplate" 
     connection-factory="connectionFactory" 
     reply-timeout="${rabbit.rpc.timeout}" 
     reply-queue="reply"> 
    <rabbit:reply-listener concurrency="${rabbit.consumers}" /> 
</rabbit:template> 

<!-- Register Queue Listener Beans --> 
<rabbit:listener-container 
     connection-factory="connectionFactory" 
     channel-transacted="true" 
     requeue-rejected="true" 
     concurrency="${rabbit.consumers}"> 
    <rabbit:listener queues="test" ref="TestProcessor" method="onMessage" /> 
</rabbit:listener-container> 

<rabbit:queue id="test" name="test" /> 
<rabbit:queue id="reply" name="reply" /> 

Я использую пружинный AMQP 1.4.4 в так, что это никакой пользы:

<dependency> 
     <groupId>org.springframework.amqp</groupId> 
     <artifactId>spring-rabbit</artifactId> 
     <version>1.4.4.RELEASE</version> 
    </dependency> 

Это, как я строю мое сообщение и опубликовать его:

MessageProperties properties = new MessageProperties();   
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON); 
Message message = new Message(toJson(request).getBytes(), properties); 
Message res = getTemplate().sendAndReceive(exchange, queue, message); 

шаблона является просто autowired экземпляр AmqpTemplate:

@Autowired 
AmqpTemplate template; 

Первое сообщение получает немедленный ответ, второе сообщение (и третий и так далее) получает следующую StackTrace на стороне потребителя:

2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34 
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed. 
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103) 
    at java.lang.Thread.run(Thread.java:744) 
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276) 
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799) 
    ... 10 more 

... в то время как издатель просто отключается после того, как не получил ответа в очереди ответа.

Это, как я ответить на сообщение на стороне потребителя:

@Override 
    public void onMessage(Message message, Channel channel) throws Exception { 
     try { 
      ... 
      System.out.println(message); 
      // handle reply-to 
      if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) { 

       Message res = new Message(toJson(response).getBytes(), message.getMessageProperties()); 
       getTemplate().send("", message.getMessageProperties().getReplyTo(), res); 

      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
      // TODO: forward to exception queue here 
     }  
    } 

That System.out.println(message); печатает следующее:

(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0]) 

Любые идеи?

ответ

1

У вас есть 2 шаблона кроликов, каждый из которых использует одну и ту же reply очередь, поэтому «второй» ответ «принимается» шаблоном на стороне потребителя (следовательно, сообщение журнала, потому что оно получило «ответ», когда нет выдающийся запрос, ожидающий ответа - это уже на стороне производителя).

Обратите внимание, что, поскольку rabbitmq 3.4, как правило, лучше использовать новый кролик встроенный direct reply-to feature; он, как правило, решает все причины, по которым нам пришлось реализовать механизм фиксированного ответа на очередь. Поддержка прямого ответа была добавлена ​​в Spring AMQP 1.4.1.RELEASE.

+0

А, это имеет смысл. Поэтому для каждого приложения требуется отдельная очередь ответов, если я хочу использовать фиксированные очереди ответов. Как использовать функцию прямого ответа?Просто удалите ответ-прослушиватель и удалите очередь ответа? –

+1

> просто удалите ... Да, если вы используете rabbitmq 3.4.x или выше. –

0

После нескольких дней вмешательства, единственное, что я понял с помощью sendAndReceive() шаблона rabit, - это никогда не вмешиваться в ключи привязки и позволить фреймворку устанавливать его в очередь. Он отлично работает в этом смысле, но если я использую свои мозги над настройкой, многое может пойти не так.

Прямо сейчас я застреваю с получением идентификатора корреляции, и тот, который я получаю, не совпадает с тем, что я отправил. Как это возможно ?

+1

Гэри, не могли бы вы помочь нам здесь с хорошим примером для RPC с конфигурацией xml вместе с шаблонами и отправкой и получением. –

+0

Я считаю, что это относится к отдельному вопросу, я написал много шаблонов, поэтому должен быть в состоянии помочь. –

Смежные вопросы