2014-09-29 3 views
1

Наше приложение построено и работает на: -
spring.version = 3.2.8.RELEASE
spring.amqp.version = 1.2.0.RELEASE
rabbitmq.version = 3.1.3 (клиент) версия
RabbitMQ сервер 3.1.5Ошибка при upgarding Spring-AMQP от 1.2.0 до 1.3.0

мы хотели обновить сервер RabbitMQ от 3.1.5 до 3.3.5, и мы сделали это успешно.

Теперь мы хотели обновить приложение, чтобы использовать последнюю версию весна-AMQP, Java клиент RabbitMQ, поэтому мы должны обновить следующие компоненты: -


spring.version = 3.2.8.RELEASE
весна .amqp.version = 1.3.0.RELEASE
rabbitmq.version = 3.2.4 (клиент)
RabbitMQ версия Сервер 3.3.5

Однако после обновления до весенне-AMQP 1.3.0, наше приложение, запущенное повесить. В основном мы запускаем множество контейнеров-слушателей во время запуска приложения, и запуск каждого контейнера-слушателя теперь занимает ровно 60 секунд, чтобы перейти к следующему этапу.

После глубокого рытья я обнаружил, что программа получает зависание в методе run() в классе SimpleMessageListenerContainer : -

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer

public void run() { 

     boolean aborted = false; 

     int consecutiveIdles = 0; 

     int consecutiveMessages = 0; 

     try { 

      try { 
       SimpleMessageListenerContainer.this.redeclareElementsIfNecessary(); // Here is where the programing thread hangs. 
       this.consumer.start(); 
       this.start.countDown(); 
      } 

Как уже упоминалось выше, код, поток получает висел на redeclareElementsIfNecessary() метод, и этот метод вводится в этой версии весенне только кролик. Я понятия не имею, почему его повесили там, независимо от предлагаемого параметра, который я передаю этому SimpleMessageListenerContainer, он, похоже, не работает.

Если я вернусь к выпуску Spring-amqp 1.2.0 с новым сервером RabbitMQ 3.3.5, все, кажется, работает нормально, но все не работает с новым клиентом spring-amqp.

Я сейчас застрял здесь пару дней. Мастер Spring/Rabbitmq, вы можете помочь мне решить эту проблему?


Спасибо за ваш быстрый ответ, однако, кажется, что код не доходит до этой точки, и он просто зацикливаться только выше фрагмент кода, который вы предоставляете, меня комментарий ниже о том, где код получает навесил точно ниже

Set<String> queueNames = this.getQueueNamesAsSet(); 
Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class); // The code started to hung here 
for (Entry<String, Queue> entry : queueBeans.entrySet()) { 
    Queue queue = entry.getValue(); 
    if (queueNames.contains(queue.getName()) && queue.isAutoDelete() 
      && this.rabbitAdmin.getQueueProperties(queue.getName()) == null) { 
     if (logger.isDebugEnabled()) { 
      logger.debug("At least one auto-delete queue is missing: " + queue.getName() 
        + "; redeclaring context exchanges, queues, bindings."); 
     } 
     this.rabbitAdmin.initialize(); 
     break; 
    } 
} 

на самом деле мы обновили до последней версии только весной-AMQP, что

spring.version = 3.2.8.RELEASE 
spring.amqp.version = 1.3.6.RELEASE 
rabbitmq.version = 3.3.4 (client) 
RabbitMQ Server version is 3.3.5 

, однако мы столкнулись с одной и той же проблемой, поэтому, чтобы узнать, из какой версии проблема началась, я спустился к более низким версиям до 1.3.0, похоже, проблема начинается с версии 1.0.0 Spring-amqp. вот в чем причина.

Я приложил запрошенную информацию, включая отвалы резьбы, которые основаны только на весеннем amqp 1.3.6.

Вот конфигурация нашего слушателя контейнер, в котором программы висит, как вы можете видеть, что мы имеем нашу собственную SimpleMessageLinstenerContainer, который действует как обертка для SimpleMessageListenerContainer в acutal весны, Я также прилагается этот пользовательский файл оболочки для справки.

<bean id="tlogOutOfCycleMessageListenerPrototype" class="com.myorg.ips.cnccommon.support.amqp.SimpleMessageListenerContainer" scope="prototype"> 
    <property name="channelTransacted" value="true" /> 
    <property name="transactionManager" ref="transactionManager" /> 
    <property name="concurrentConsumers" value="1" /> 
    <property name="taskExecutor" ref="tlogOutOfCycleMessageListenerPool" /> 
    <property name="messageListener" ref="tlogMLAOutOfCycle" /> 
    <property name="errorHandler" ref="tlogOutOfCycleMessageHandler" /> 
    <property name="autoStartup" value="false" /> 
    <property name="instanceNameForLogging" value="site1TlogOutOfCycleMessageListener"/> 
    <!-- A dummy connection factory which will never be used --> 
    <property name="connectionFactory" ref="switchCompositeConnectionFactoryPrototype"/> 
</bean> 

Наша обертка класс SimpleMessageListenerContainer.java

package com.myorg.ips.cnccommon.support.amqp; 

    import org.apache.commons.lang.builder.ToStringBuilder; 
    import org.slf4j.cal10n.LocLogger; 
    import org.springframework.util.ErrorHandler; 

    import com.myorg.ips.amqp.SwitchSiteSupport; 
    import com.myorg.ips.logging.LoggerFactory; 
    import com.myorg.ips.system.config.InitialisableSiteAware; 

    import static com.myorg.ips.logging.SystemWideLogMessages.ERROR_AMQP_FAILED_TO_START_LISTENER; 
    import static com.myorg.ips.logging.SystemWideLogMessages.INFO_AMQP_STOPPING_LISTENER; 

    /** 
    * 
    * Wrapper for the Spring SimpleMessageListenerContainer which simply allows us to delay (or prevent startup). Can also restart on command. 
    * 
    */ 
    public class SimpleMessageListenerContainer extends org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer implements InitialisableSiteAware { 
     private static final LocLogger logger = LoggerFactory.getLogger(SimpleMessageListenerContainer.class); 
     private boolean autoStart = true; 
     private ErrorHandler exposedErrorHandler; 

     private boolean springBeanInitialisationAttempted = false; 

     private boolean springBeanInitialised = false; 

     private String instanceNameForLogging; 

     @Override 
     public void initialize() { 
      // Do nothing -- we will instead perform the Spring bean initialisation later on via the factory bean, after the connection factory has been set 
      springBeanInitialisationAttempted = true; 
      springBeanInitialised = false; 
     } 

     @Override 
     public void initialise() { 
      SwitchSiteSupport.initialiseIfSiteAware(getMessageListener()); 
      SwitchSiteSupport.initialiseIfSiteAware(getErrorHandler()); 

      // If this object is a Spring bean, we should now complete the initialisation that the Spring framework attempted earlier 
      if (springBeanInitialisationAttempted && !springBeanInitialised) { 
       springBeanInitialised = true; 
       super.initialize(); 
       if (isAutoStartup()) { 
        start(); 
       } 
      } 
     } 

     @Override 
     public void configureForSite(final MultiHostConnectionFactory configuredConnectionFactory) { 
      setConnectionFactory(configuredConnectionFactory); 
      SwitchSiteSupport.configureIfSiteAware(getMessageListener(), configuredConnectionFactory); 
      SwitchSiteSupport.configureIfSiteAware(getErrorHandler(), configuredConnectionFactory); 

      setInstanceNameForLogging(SwitchSiteSupport.replaceWithSiteAlias(instanceNameForLogging, configuredConnectionFactory)); 
     } 

     @Override 
     //CHECKSTYLE:OFF Unfortunately the parent springframework class throws and exception, so so do we 
     protected void doStart() throws Exception { 
      //CHECKSTYLE:ON 
      if (autoStart) { 
       logger.debug("Starting message listener " + instanceNameForLogging); 
       super.doStart(); 
       logger.debug("Started message listener " + instanceNameForLogging); 
      } 
     } 

     /** 
     * Start this listener 
     */ 
     public void start() { 
      autoStart = true; 
      try { 
       doStart(); 
       //CHECKSTYLE:OFF Unfortunately the parent springframework class throws and exception, so that is what we catch 
      } catch (Exception e) { 
       //CHECKSTYLE:ON 
       logger.error(ERROR_AMQP_FAILED_TO_START_LISTENER, e); 
      } 
     } 

     /** 
     * Stop listener 
     */ 
     public void stop() { 
      logger.info(INFO_AMQP_STOPPING_LISTENER, getBeanName()); 
      autoStart = false; 
      doStop(); 
     } 

     /** 
     * Stop and start this listener 
     */ 
     public void restart() { 
      stop(); 
      start(); 
     } 

     /** 
     * Store the errorHandler in a subclass-specific property so that we can retrieve it later 
     * @param errorHandler errorHandler 
     */ 
     @Override 
     public void setErrorHandler(final ErrorHandler errorHandler) { 
      this.exposedErrorHandler = errorHandler; 
      super.setErrorHandler(errorHandler); 
     } 

     /** 
     * Return the exposed errorHandler 
     * @return errorHandler 
     */ 
     public ErrorHandler getErrorHandler() { 
      return exposedErrorHandler; 
     } 

     public void setInstanceNameForLogging(final String instanceNameForLogging) { 
      this.instanceNameForLogging = instanceNameForLogging; 
     } 

     @Override 
     public String toString(){ 
      return ToStringBuilder.reflectionToString(this); 
     } 
    } 

ответ

0

Хороший улов в любом случае!

Я только сейчас работаю над этим кодом для Spring AMQP 1.4.

Не могли бы вы поделиться:

  1. Конфигурация для ListenerContainer, на котором висят
  2. A отладочный анализ для кода из этого redeclareElementsIfNecessary()

На самом деле сейчас, что код выглядит как:

if (queueNames.contains(queue.getName()) && queue.isAutoDelete() 
     && this.rabbitAdmin.getQueueProperties(queue.getName()) == null) { 
      if (logger.isDebugEnabled()) { 
       logger.debug("At least one auto-delete queue is missing: " + queue.getName() 
           + "; redeclaring context exchanges, queues, bindings."); 
      } 
      this.rabbitAdmin.initialize(); 
      break; 
} 

Таким образом, это может быть только heppen на очереди auto-delete.

Или у вас есть еще один снимок? ..

UPDATE

По вашему ThreadDump. Это незаконно:

at com.vocalink.ips.amqp.AmqpMessageListenerManager.initialise(AmqpMessageListenerManager.java:106) 
     at com.vocalink.ips.amqp.SwitchSiteSupport.initialiseIfSiteAware(SwitchSiteSupport.java:29) 
     at com.vocalink.ips.system.config.AbstractSiteAwareComponentCachingFactory.createAndConfigureSiteAwareComponent(AbstractSiteAwareComponentCachingFactory.java:51) 

Вы не можете start компонент в фазе инициализации. Или оставить его в контейнере или просто сделать start вручную где-нибудь во время выполнения, когда все ваши компоненты уже созданы.

Например, вы можете сделать это, используя ApplicationListener<ContextRefreshedEvent>.

+0

Пожалуйста, также возьмите свалку потока, когда находитесь в состоянии «висели». Также, пожалуйста, используйте последнюю версию в выпуске - последняя версия 1.3.x - 1.3.6; с версии 1.3.0 было исправлено множество ошибок. Меня интересует, что привело вас к версии 1.3.0, а не к текущему выпуску. На странице [project page] (http://projects.spring.io/spring-amqp/) четко отображается текущее. –

+0

Просто для объяснения; основной поток удерживает блокировку в контексте приложения, которая необходима контейнеру во время 'start()'. Вы должны __never__ выполнять какую-либо реальную деятельность (например, запуск контейнеров) во время инициализации бина (например, в 'FactoryBean.getObject()'). По определению контекст приложения все еще создается. Вам просто повезло, что у вас не было таких проблем до сих пор. –

+0

Привет, Артем/Гэри, ваше предложение действительно помогло решить проблему сортировки, мы использовали SmartLifecycle в нашем компоненте Listnermanger, который фактически запускает контейнеры-слушатели в методе initialise(), который вызывается при запуске контейнера, Я теперь использовал ApplicationListener , чтобы контейнеры-слушатели запускались после того, как контекст приложения get был инициализирован, теперь он исправил зависание и задержки при запуске нашего приложения. –

Смежные вопросы