Наше приложение построено и работает на: -
spring.version = 3.2.8.RELEASE
spring.amqp.version = 1.2.0.RELEASE
rabbitmq.version = 3.1.3 (клиент) версия
RabbitMQ сервер 3.1.5Ошибка при upgarding Spring-AMQP от 1.2.0 до 1.3.0
мы хотели обновить сервер RabbitMQ от 3.1.5 до 3.3.5, и мы сделали это успешно.
Теперь мы хотели обновить приложение, чтобы использовать последнюю версию весна-AMQP, Java клиент RabbitMQ, поэтому мы должны обновить следующие компоненты: -
spring.version = 3.2.8.RELEASE
весна .amqp.version = 1.3.0.RELEASE
rabbitmq.version = 3.2.4 (клиент)
RabbitMQ версия Сервер 3.3.5
Однако после обновления до весенне-AMQP 1.3.0, наше приложение, запущенное повесить. В основном мы запускаем множество контейнеров-слушателей во время запуска приложения, и запуск каждого контейнера-слушателя теперь занимает ровно 60 секунд, чтобы перейти к следующему этапу.
После глубокого рытья я обнаружил, что программа получает зависание в методе run() в классе SimpleMessageListenerContainer : -
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
public void run() {
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
try {
try {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary(); // Here is where the programing thread hangs.
this.consumer.start();
this.start.countDown();
}
Как уже упоминалось выше, код, поток получает висел на redeclareElementsIfNecessary() метод, и этот метод вводится в этой версии весенне только кролик. Я понятия не имею, почему его повесили там, независимо от предлагаемого параметра, который я передаю этому SimpleMessageListenerContainer, он, похоже, не работает.
Если я вернусь к выпуску Spring-amqp 1.2.0 с новым сервером RabbitMQ 3.3.5, все, кажется, работает нормально, но все не работает с новым клиентом spring-amqp.
Я сейчас застрял здесь пару дней. Мастер Spring/Rabbitmq, вы можете помочь мне решить эту проблему?
Спасибо за ваш быстрый ответ, однако, кажется, что код не доходит до этой точки, и он просто зацикливаться только выше фрагмент кода, который вы предоставляете, меня комментарий ниже о том, где код получает навесил точно ниже
Set<String> queueNames = this.getQueueNamesAsSet();
Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class); // The code started to hung here
for (Entry<String, Queue> entry : queueBeans.entrySet()) {
Queue queue = entry.getValue();
if (queueNames.contains(queue.getName()) && queue.isAutoDelete()
&& this.rabbitAdmin.getQueueProperties(queue.getName()) == null) {
if (logger.isDebugEnabled()) {
logger.debug("At least one auto-delete queue is missing: " + queue.getName()
+ "; redeclaring context exchanges, queues, bindings.");
}
this.rabbitAdmin.initialize();
break;
}
}
на самом деле мы обновили до последней версии только весной-AMQP, что
spring.version = 3.2.8.RELEASE
spring.amqp.version = 1.3.6.RELEASE
rabbitmq.version = 3.3.4 (client)
RabbitMQ Server version is 3.3.5
, однако мы столкнулись с одной и той же проблемой, поэтому, чтобы узнать, из какой версии проблема началась, я спустился к более низким версиям до 1.3.0, похоже, проблема начинается с версии 1.0.0 Spring-amqp. вот в чем причина.
Я приложил запрошенную информацию, включая отвалы резьбы, которые основаны только на весеннем amqp 1.3.6.
Вот конфигурация нашего слушателя контейнер, в котором программы висит, как вы можете видеть, что мы имеем нашу собственную SimpleMessageLinstenerContainer, который действует как обертка для SimpleMessageListenerContainer в acutal весны, Я также прилагается этот пользовательский файл оболочки для справки.
<bean id="tlogOutOfCycleMessageListenerPrototype" class="com.myorg.ips.cnccommon.support.amqp.SimpleMessageListenerContainer" scope="prototype">
<property name="channelTransacted" value="true" />
<property name="transactionManager" ref="transactionManager" />
<property name="concurrentConsumers" value="1" />
<property name="taskExecutor" ref="tlogOutOfCycleMessageListenerPool" />
<property name="messageListener" ref="tlogMLAOutOfCycle" />
<property name="errorHandler" ref="tlogOutOfCycleMessageHandler" />
<property name="autoStartup" value="false" />
<property name="instanceNameForLogging" value="site1TlogOutOfCycleMessageListener"/>
<!-- A dummy connection factory which will never be used -->
<property name="connectionFactory" ref="switchCompositeConnectionFactoryPrototype"/>
</bean>
Наша обертка класс SimpleMessageListenerContainer.java
package com.myorg.ips.cnccommon.support.amqp;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.cal10n.LocLogger;
import org.springframework.util.ErrorHandler;
import com.myorg.ips.amqp.SwitchSiteSupport;
import com.myorg.ips.logging.LoggerFactory;
import com.myorg.ips.system.config.InitialisableSiteAware;
import static com.myorg.ips.logging.SystemWideLogMessages.ERROR_AMQP_FAILED_TO_START_LISTENER;
import static com.myorg.ips.logging.SystemWideLogMessages.INFO_AMQP_STOPPING_LISTENER;
/**
*
* Wrapper for the Spring SimpleMessageListenerContainer which simply allows us to delay (or prevent startup). Can also restart on command.
*
*/
public class SimpleMessageListenerContainer extends org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer implements InitialisableSiteAware {
private static final LocLogger logger = LoggerFactory.getLogger(SimpleMessageListenerContainer.class);
private boolean autoStart = true;
private ErrorHandler exposedErrorHandler;
private boolean springBeanInitialisationAttempted = false;
private boolean springBeanInitialised = false;
private String instanceNameForLogging;
@Override
public void initialize() {
// Do nothing -- we will instead perform the Spring bean initialisation later on via the factory bean, after the connection factory has been set
springBeanInitialisationAttempted = true;
springBeanInitialised = false;
}
@Override
public void initialise() {
SwitchSiteSupport.initialiseIfSiteAware(getMessageListener());
SwitchSiteSupport.initialiseIfSiteAware(getErrorHandler());
// If this object is a Spring bean, we should now complete the initialisation that the Spring framework attempted earlier
if (springBeanInitialisationAttempted && !springBeanInitialised) {
springBeanInitialised = true;
super.initialize();
if (isAutoStartup()) {
start();
}
}
}
@Override
public void configureForSite(final MultiHostConnectionFactory configuredConnectionFactory) {
setConnectionFactory(configuredConnectionFactory);
SwitchSiteSupport.configureIfSiteAware(getMessageListener(), configuredConnectionFactory);
SwitchSiteSupport.configureIfSiteAware(getErrorHandler(), configuredConnectionFactory);
setInstanceNameForLogging(SwitchSiteSupport.replaceWithSiteAlias(instanceNameForLogging, configuredConnectionFactory));
}
@Override
//CHECKSTYLE:OFF Unfortunately the parent springframework class throws and exception, so so do we
protected void doStart() throws Exception {
//CHECKSTYLE:ON
if (autoStart) {
logger.debug("Starting message listener " + instanceNameForLogging);
super.doStart();
logger.debug("Started message listener " + instanceNameForLogging);
}
}
/**
* Start this listener
*/
public void start() {
autoStart = true;
try {
doStart();
//CHECKSTYLE:OFF Unfortunately the parent springframework class throws and exception, so that is what we catch
} catch (Exception e) {
//CHECKSTYLE:ON
logger.error(ERROR_AMQP_FAILED_TO_START_LISTENER, e);
}
}
/**
* Stop listener
*/
public void stop() {
logger.info(INFO_AMQP_STOPPING_LISTENER, getBeanName());
autoStart = false;
doStop();
}
/**
* Stop and start this listener
*/
public void restart() {
stop();
start();
}
/**
* Store the errorHandler in a subclass-specific property so that we can retrieve it later
* @param errorHandler errorHandler
*/
@Override
public void setErrorHandler(final ErrorHandler errorHandler) {
this.exposedErrorHandler = errorHandler;
super.setErrorHandler(errorHandler);
}
/**
* Return the exposed errorHandler
* @return errorHandler
*/
public ErrorHandler getErrorHandler() {
return exposedErrorHandler;
}
public void setInstanceNameForLogging(final String instanceNameForLogging) {
this.instanceNameForLogging = instanceNameForLogging;
}
@Override
public String toString(){
return ToStringBuilder.reflectionToString(this);
}
}
Пожалуйста, также возьмите свалку потока, когда находитесь в состоянии «висели». Также, пожалуйста, используйте последнюю версию в выпуске - последняя версия 1.3.x - 1.3.6; с версии 1.3.0 было исправлено множество ошибок. Меня интересует, что привело вас к версии 1.3.0, а не к текущему выпуску. На странице [project page] (http://projects.spring.io/spring-amqp/) четко отображается текущее. –
Просто для объяснения; основной поток удерживает блокировку в контексте приложения, которая необходима контейнеру во время 'start()'. Вы должны __never__ выполнять какую-либо реальную деятельность (например, запуск контейнеров) во время инициализации бина (например, в 'FactoryBean.getObject()'). По определению контекст приложения все еще создается. Вам просто повезло, что у вас не было таких проблем до сих пор. –
Привет, Артем/Гэри, ваше предложение действительно помогло решить проблему сортировки, мы использовали SmartLifecycle в нашем компоненте Listnermanger, который фактически запускает контейнеры-слушатели в методе initialise(), который вызывается при запуске контейнера, Я теперь использовал ApplicationListener, чтобы контейнеры-слушатели запускались после того, как контекст приложения get был инициализирован, теперь он исправил зависание и задержки при запуске нашего приложения. –