2015-06-09 2 views
1

Я использую функцию AmqpTemplate.sendAndReceive с нескольких серверов, она работает, и повторное соединение работает после перезапуска кролика или сетевых проблем.Spring amqp RPC-запрос на получение кроликовqq

Он работал большой в течение нескольких недель, но вдруг один из моих серверов становятся тайма-аута для каждого sendAndReceive вызова, RabbitMQ получает сообщение и обрабатываемый но sendAndReceive не получает ответ (ответ-тайм-аут установлен на 60 секунд, и для обработки сообщения требуется всего несколько секунд). Другие серверы работали одновременно с одним и тем же кодом и с использованием той же очереди.

Сервер вернулся к работе только после перезапуска службы на нем.

Я думаю, что это проблема пересоединения (даже если сообщения отправлены на rabbitmq успешно), возможно, прослушиватель ответов AmqpTemplate не пересоединился или что-то в этом роде.

Кто-нибудь знает, в чем проблема? и как я могу предотвратить , чтобы это произошло снова?

мой ConnectionFactory установка:

setConnectionTimeout(1000); 
setRequestedHeartbeat(100); 
setTopologyRecoveryEnabled(true); 
setAutomaticRecoveryEnabled(true); 

Edit:

весна-AMQP версия: 1.4.3

<bean id="myConnectionFactory" class="path.to.myConnectionFactoryClass"></bean> 

<rabbit:connection-factory id="myRabbitConnectionFactory" connection-factory="myConnectionFactory" channel-cache-size="25" /> 

<rabbit:template id="myTemplate" connection-factory="myRabbitConnectionFactory" reply-timeout="65000" /> 

Edit:

Это случилось снова, я вижу что он перестает работать после получения ошибки:

org.springframework.amqp.AmqpConnectException: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException 
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:51) 
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:110) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1051) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820) 
at MyClass.onMessage(MyClass.java:1234) 
at sun.reflect.GeneratedMethodAccessor3558.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) 
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) 
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207) 
at com.sun.proxy.$Proxy95.onMessage(Unknown Source) 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:237) 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756) 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241) 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$100(SimpleMessageListenerContainer.java:82) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:975) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$2.doInTransaction(SimpleMessageListenerContainer.java:968) 
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:968) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; cause: java.io.EOFException 
at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:174) 
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:496) 
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:96) 
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:42) 
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:747) 
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:736) 
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:416) 
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:392) 
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$500(CachingConnectionFactory.java:75) 
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:623) 
at com.sun.proxy.$Proxy74.basicCancel(Unknown Source) 
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:944) 
at org.springframework.amqp.rabbit.core.RabbitTemplate$7.doInRabbit(RabbitTemplate.java:902) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1045) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:902) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:894) 
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820) 

Не знаете, почему соединение закрыто и почему оно не было повторно подключено.

+0

Пожалуйста, предоставьте полную конфигурацию и версию Spring AMQP. Spring AMQP имеет свой механизм восстановления соединения (который предшествует версии для кролика-клиента на долгое время). Версии до 1.4 не совместимы с rabbitmq 'automaticRecoveryEnabled'. –

+0

Спасибо Гэри за ваш комментарий, я использую v1.4.3, и я редактировал сообщение с дополнительной информацией о конфигурации. –

+0

Это довольно простая настройка; нет ничего, чтобы «восстановить» на принимающей стороне, поскольку он использует прямой ответ (или временную очередь, в зависимости от версии rabbitmq) для ответов; вы видите что-нибудь полезное в журналах? (это приложение и/или кроличковые журналы). –

ответ

0

Я думаю, что это решено. Я не поймал исключения в OnMesaage, и я считаю, что это убило моих слушателей.

0

at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:820)

at MyClass.onMessage(MyClass.java:1234) ...

Так что это не восстановление серверной части; ваш MyClass вызывает другую операцию отправки и получения шаблона кролика после получения сообщения. Таким образом, он действует как клиент в этой ситуации.

Однако, как правило, это исключение было бы отправлено в контейнер, а rabbitmq повторно отправит сообщение (если у вас нет режима подтверждения NONE), пока ваш слушатель выдает исключение контейнеру. Если ваш слушатель ловит исключение и ничего не делает с ним, кроме ведения журнала, вы увидите этот результат.

Если вы не хотите, чтобы кролик повторно отправил сообщение об ошибке или входящий контейнер не подтвердил сообщения, вы можете настроить RetryTemplate в исходящий шаблон кролика для восстановления соединения; см. the documentation.

Если это не объясняет вашу ситуацию, вам необходимо показать полную конфигурацию, код в MyClass.onMessage() и полный журнал.

Смежные вопросы