Я пытаюсь реализовать конфигурацию RabbitMQ, которая позволит мне использовать фиксированную очередь ответов вместо того, чтобы появляться сотни очередей темпов. Мое первое сообщение, которое публикуется, получает немедленный ответ через очередь ответа, второе, третье, а иногда даже пятое сообщение, просто дает мне стекную строку Reply received after timeout
. Если я немного подожду и отправлю другое сообщение, я снова получаю ответ с немедленными очередными сообщениями с той же ошибкой.Исправлено время ожидания очереди ответа после второго сообщения
На стороне издателя, у меня есть следующая конфигурация:
<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
<property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="${rabbit.rpc.timeout}"
reply-queue="reply">
<rabbit:reply-listener />
</rabbit:template>
<rabbit:queue id="reply" name="reply" />
На стороне потребителя я имею следующую конфигурацию:
<bean id="nativeConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="connectionTimeout" value="${rabbit.connection.timeout}"/>
<property name="requestedHeartbeat" value="${rabbit.heartbeat}"/>
</bean>
<rabbit:connection-factory
id="connectionFactory"
port="${rabbit.port}"
virtual-host="${rabbit.virtual}"
host="${rabbit.host}"
username="${rabbit.username}"
password="${rabbit.password}"
connection-factory="nativeConnectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template
id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="${rabbit.rpc.timeout}"
reply-queue="reply">
<rabbit:reply-listener concurrency="${rabbit.consumers}" />
</rabbit:template>
<!-- Register Queue Listener Beans -->
<rabbit:listener-container
connection-factory="connectionFactory"
channel-transacted="true"
requeue-rejected="true"
concurrency="${rabbit.consumers}">
<rabbit:listener queues="test" ref="TestProcessor" method="onMessage" />
</rabbit:listener-container>
<rabbit:queue id="test" name="test" />
<rabbit:queue id="reply" name="reply" />
Я использую пружинный AMQP 1.4.4 в так, что это никакой пользы:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.4.RELEASE</version>
</dependency>
Это, как я строю мое сообщение и опубликовать его:
MessageProperties properties = new MessageProperties();
properties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
Message message = new Message(toJson(request).getBytes(), properties);
Message res = getTemplate().sendAndReceive(exchange, queue, message);
шаблона является просто autowired экземпляр AmqpTemplate:
@Autowired
AmqpTemplate template;
Первое сообщение получает немедленный ответ, второе сообщение (и третий и так далее) получает следующую StackTrace на стороне потребителя:
2015-04-22 07:53:03,329 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.core.RabbitTemplate - Reply received after timeout for 4bfb2f6f-2e31-414c-9ec3-a4672e4c7e34
2015-04-22 07:53:03,330 [SimpleAsyncTaskExecutor-1] WARN org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:864)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:802)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:690)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:799)
... 10 more
... в то время как издатель просто отключается после того, как не получил ответа в очереди ответа.
Это, как я ответить на сообщение на стороне потребителя:
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
...
System.out.println(message);
// handle reply-to
if (message.getMessageProperties() != null && message.getMessageProperties().getReplyTo() != null) {
Message res = new Message(toJson(response).getBytes(), message.getMessageProperties());
getTemplate().send("", message.getMessageProperties().getReplyTo(), res);
}
} catch (Exception e) {
e.printStackTrace();
// TODO: forward to exception queue here
}
}
That System.out.println(message);
печатает следующее:
(Body:'{"message":"Sent 'Test Text' on Wed Apr 22 08:17:13 SAST 2015"}'MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=[56, 50, 98, 100, 100, 56, 53, 54, 45, 57, 101, 100, 102, 45, 52, 99, 54, 97, 45, 97, 55, 51, 101, 45, 102, 54, 48, 101, 50, 49, 48, 53, 55, 101, 97, 48], replyTo=reply, contentType=application/json, contentEncoding=null, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=test, deliveryTag=1, messageCount=0])
Любые идеи?
А, это имеет смысл. Поэтому для каждого приложения требуется отдельная очередь ответов, если я хочу использовать фиксированные очереди ответов. Как использовать функцию прямого ответа?Просто удалите ответ-прослушиватель и удалите очередь ответа? –
> просто удалите ... Да, если вы используете rabbitmq 3.4.x или выше. –