2013-01-15 2 views
18

Я исследую решения для очередей для одного из приложений моей команды. В идеале мы хотели бы что-то, что можно настроить как в виде легкого, в процессе брокера (для обмена сообщениями с низкой пропускной способностью между потоками), так и в качестве внешнего брокера. Есть ли там сервер MQ, который может это сделать? Большинство из них, похоже, требуют установки в качестве внешнего объекта. Похоже, что ZeroMQ ближе всего подходит к процессинговому решению, но, похоже, он больше похож на «UDP-сокет на стероидах», и нам нужна надежная доставка.Есть ли серверы MQ, которые могут запускаться в Java?

+1

Я думаю, что ответы на http://stackoverflow.com/questions/2507536/lightweight-jms-broker содержат интересную информацию (например, [ffmq] (http://ffmq.sourceforge.net/index .html)). ActiveMQ - еще один, хотя и более тяжелый кандидат, но он также встраивается. – fvu

+1

Как и @fvu, ActiveMQ немного тяжелее ZeroMQ, но он отлично работает как встроенный процесс. И если вы используете Spring, его очень легко настроить. –

+0

ZeroMQ работает среди прочего поверх TCP (не UDP), который обеспечивает надежный транспорт. Однако вы имеете в виду постоянную очередь? То есть на диске? –

ответ

10

Как мы сказали, ActiveMQ немного тяжелее, чем ZeroMQ, но он отлично работает как встроенный процесс. Вот простой пример с Spring и ActiveMQ.

В сообщении слушателя, который будет использоваться для проверки очереди: конфигурация

public class TestMessageListener implements MessageListener { 

    private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class); 

    @Override 
    public void onMessage(Message message) { 

     /* Receive the text message */ 
     if (message instanceof TextMessage) { 

      try { 
       String text = ((TextMessage) message).getText(); 
       System.out.println("Message reception from the JMS queue : " + text); 

      } catch (JMSException e) { 
       logger.error("Error : " + e.getMessage()); 
      } 

     } else { 
      /* Handle non text message */ 
     } 
    } 
} 

ActiveMQ контекста:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
         http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> 
     <property name="brokerURL"> 
      <value>tcp://localhost:61617</value> 
     </property> 
    </bean> 

    <bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> 
     <constructor-arg ref="jmsQueueConnectionFactory" /> 
    </bean> 

    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg value="messageQueue" /> 
    </bean> 

    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 
     <constructor-arg ref="pooledJmsQueueConnectionFactory" /> 
     <property name="pubSubDomain" value="false"/> 
    </bean> 

    <bean id="testMessageListener" class="com.example.jms.TestMessageListener" /> 

    <bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" /> 
     <property name="destination" ref="QueueDestination" /> 
     <property name="messageListener" ref="testMessageListener" /> 
     <property name="concurrentConsumers" value="5" /> 
     <property name="acceptMessagesWhileStopping" value="false" /> 
     <property name="recoveryInterval" value="10000" /> 
     <property name="cacheLevelName" value="CACHE_CONSUMER" /> 
    </bean> 

</beans> 

Тест JUnit:

@ContextConfiguration(locations = {"classpath:/activeMQ-context.xml"}) 
public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests { 

    @Autowired 
    private JmsTemplate template; 

    @Autowired 
    private ActiveMQDestination destination; 

    @Test 
    public void testJMSFactory() { 
     /* sending a message */ 
     template.convertAndSend(destination, "Hi"); 

     /* receiving a message */ 
     Object msg = template.receive(destination); 
     if (msg instanceof TextMessage) { 
      try { 
       System.out.println(((TextMessage) msg).getText()); 
      } catch (JMSException e) { 
       System.out.println("Error : " + e.getMessage()); 
      } 
     } 
    } 
} 

зависимостей добавить к pom.xml:

<!-- Spring --> 
<dependency> 
    <groupId>org.springframework</groupId> 
    <artifactId>spring-jms</artifactId> 
    <version>${org.springframework-version}</version> 
</dependency> 

<!-- ActiveMQ --> 
<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-all</artifactId> 
    <version>5.6.0</version> 
    <scope>compile</scope> 
</dependency> 

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-pool</artifactId> 
    <version>5.6.0</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.activemq</groupId> 
    <artifactId>activemq-core</artifactId> 
    <version>5.6.0</version> 
</dependency> 
+0

Это получилось отлично, спасибо. –

+0

Проблемы, которые мы видели с помощью ActiveMQ (и почему я сейчас выполняю быстрый поиск альтернатив): (a) когда вы закрываете его, он не останавливает все свои потоки. (б) многие из его API-сервисов автоматически запускают службы при построении, а также имеют методы start(), что делает API неясным и, возможно, способствующим (а). (c) множество проблем с многопоточным доступом, которые на сегодняшний день были «исправлены» ими, добавляя больше синхронизированных блоков. (d) сомнительная безопасность, особенно при использовании в режиме обнаружения. (Pro-tip: настройте сервер очереди сообщений изгоев для забавных ситуаций вокруг офиса!) – Trejkaz

2

Клиент WebSphere MQ имеет возможность выполнять multicast pub/sub. Это обеспечивает возможность «клиент-клиент», которая обходит диспетчер очереди, хотя диспетчер очереди необходим для установления соединения.

Смежные вопросы