2014-04-15 3 views
1

У меня супер простой сценарий: один брокер и один потребитель с долгой подпиской. Это код моего потребительского приложения:ActiveMQ отказоустойчивости, похоже, не работает

package test; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnectionFactory; 

import pojo.Event; 
import pojo.StockUpdate; 

public class Consumer 
{ 

    private static transient ConnectionFactory factory; 
    private transient Connection connection; 
    private transient Session session; 
    public static int counter = 0; 

    public Consumer(String brokerURL) throws JMSException 
    { 
     factory = new ActiveMQConnectionFactory(brokerURL); 
     connection = factory.createConnection(); 
     connection.setClientID("CLUSTER_CLIENT_1"); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    } 

    public void close() throws JMSException 
    { 
     if (connection != null) 
     { 
      connection.close(); 
     } 
    } 

    public static void main(String[] args) throws JMSException 
    { 

     try 
     { 
      // extract topics from the rest of arguments 
      String[] topics = new String[2]; 
      topics[0] = "CSCO"; 
      topics[1] = "ORCL"; 

      // define connection URI 
      Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true"); 

      for (String stock : topics) 
      { 
       try 
       { 
        Destination destination = consumer.getSession().createTopic("STOCKS." + stock); 
        // consumer.getSession(). 
        MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic) destination, "STOCKS_DURABLE_CONSUMER_" + stock); 
        messageConsumer.setMessageListener(new Listener()); 
       } 
       catch (JMSException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     } 
     catch (Throwable t) 
     { 
      t.printStackTrace(); 
     } 

    } 

    public Session getSession() 
    { 
     return session; 
    } 

} 

class Listener implements MessageListener 
{ 

    public void onMessage(Message message) 
    { 
     try 
     { 
      TextMessage textMessage = (TextMessage) message; 
      String json = textMessage.getText(); 
      Event event = StockUpdate.fromJSON(json, StockUpdate.class); 
      System.out.println("Consumed message #:" + ++Consumer.counter + "\n" + event); 
     } 
     catch (Exception e) 
     { 
      e.printStackTrace(); 
     } 
    } 

} 

Вот мой activemq.xml

<beans 
    xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

    <!-- Allows us to use system properties as variables in this configuration file --> 
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
     <property name="locations"> 
      <value>file:${activemq.conf}/credentials.properties</value> 
     </property> 
    </bean> 

    <!-- 
     The <broker> element is used to configure the ActiveMQ broker. 
    --> 
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="R6_cluster_broker1" persistent="true"> 

     <networkConnectors> 
      <networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/> 
     </networkConnectors> 

     <destinationPolicy> 
      <policyMap> 
       <policyEntries> 
       <policyEntry topic=">" > 
        <!-- The constantPendingMessageLimitStrategy is used to prevent 
         slow topic consumers to block producers and affect other consumers 
         by limiting the number of messages that are retained 
         For more information, see: 

         http://activemq.apache.org/slow-consumer-handling.html 

        --> 
        <pendingMessageLimitStrategy> 
        <constantPendingMessageLimitStrategy limit="1000"/> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
       </policyEntries> 
      </policyMap> 
     </destinationPolicy> 


     <!-- 
      The managementContext is used to configure how ActiveMQ is exposed in 
      JMX. By default, ActiveMQ uses the MBean server that is started by 
      the JVM. For more information, see: 

      http://activemq.apache.org/jmx.html 
     --> 
     <managementContext> 
      <managementContext createConnector="false"/> 
     </managementContext> 

     <!-- 
      Configure message persistence for the broker. The default persistence 
      mechanism is the KahaDB store (identified by the kahaDB tag). 
      For more information, see: 

      http://activemq.apache.org/persistence.html 
     --> 
     <persistenceAdapter> 
      <kahaDB directory="/work/temp/kahadb"/> 
     </persistenceAdapter> 


      <!-- 
      The systemUsage controls the maximum amount of space the broker will 
      use before disabling caching and/or slowing down producers. For more information, see: 
      http://activemq.apache.org/producer-flow-control.html 
      --> 
      <systemUsage> 
      <systemUsage> 
       <memoryUsage> 
        <memoryUsage percentOfJvmHeap="70" /> 
       </memoryUsage> 
       <storeUsage> 
        <storeUsage limit="100 gb"/> 
       </storeUsage> 
       <tempUsage> 
        <tempUsage limit="50 gb"/> 
       </tempUsage> 
      </systemUsage> 
     </systemUsage> 

     <!-- 
      The transport connectors expose ActiveMQ over a given protocol to 
      clients and other brokers. For more information, see: 

      http://activemq.apache.org/configuring-transports.html 
     --> 
     <transportConnectors> 
      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> --> 
     </transportConnectors> 

     <!-- destroy the spring context on shutdown to stop jetty --> 
     <shutdownHooks> 
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
     </shutdownHooks> 

    </broker> 

    <!-- 
     Enable web consoles, REST and Ajax APIs and demos 
     The web consoles requires by default login, you can disable this in the jetty.xml file 

     Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details 
    --> 
    <import resource="jetty.xml"/> 

</beans> 

Когда у меня есть как брокер и потребительский бег, а затем остановить брокера мой потребитель выходит несколько минут после того, как. Насколько я понял, он должен попытаться снова подключиться, но это не так. Что я делаю неправильно, пожалуйста, посоветуйте.

! NOTE! Я запускаю своего потребителя в Eclipse, я не создаю отдельную банку для этой задачи.

Я обновил свой брокер до последней версии 5.9.1 и сделал то же самое с моим потребителем. Результат тот же - после того, как я остановил брокера, мой потребитель умирает через несколько секунд после этого. Он отлично работает, если брокер работает и работает.

+0

Я обновил свой брокер до последней версии 5.9.1 и сделал то же самое с моим потребителем. Результат тот же - после того, как я остановил брокера, мой потребитель умирает через несколько секунд после этого. Он отлично работает, если брокер работает и работает. –

ответ

1

Хорошо, проблема была в самом деле в моем коде: не было ничего, что могло бы предотвратить выход основного потока. Поскольку поток, реализующий переход на другой ресурс, является потоком демона, потребительское приложение прекратилось сразу после того, как не было ничего, на что можно было бы удержаться (без потоков без демона).

+0

Я только что наткнулся на ту же «проблему». Спасибо, что подтвердили причину! Какое решение вы внедрили, чтобы избежать остановки основного потока? –

+0

Насколько я помню, я добавил крюк остановки. –

0

Скорее всего вы используете версию ActiveMQ, в которой есть ошибка, которая приводит к появлению всех потоков демонов, что означает, что клиент не работает. Перейдите к более поздней версии, такой как v5.9.1, и посмотрите, поможет ли это. Если не размещать больше информации, поскольку вы действительно не предоставили много.

+0

Тим, я использую Apache ActiveMQ 5.9.0 Какую дополнительную информацию вы хотели бы мне написать? –

+0

У меня есть обновленный вопрос со всей информацией, которую у меня есть. –

+0

Я могу подтвердить поведение с помощью '5.13.2'. –

Смежные вопросы