2014-09-09 2 views
1

Мое приложение обрабатывает входящие сообщения, которые помещаются в одну очередь ActiveMQ (с именем «incoming.queue»). У меня есть MessageListener, который обрабатывает сообщения, и все работает хорошо. Мой Java конфигурации ниже:Потребляйте несколько очередей ActiveMQ через ту же ActiveMQConnectionFactory

@Configuration 
@ComponentScan(basePackages="uk.co.domain") 
public class JmsConfig { 

    @Bean 
    public ActiveMQConnectionFactory connectionFactory() { 
     ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); 
     activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616"); 
     return activeMQConnectionFactory; 
    } 

    @Bean 
    public DefaultMessageListenerContainer jmsListenerContainer() { 
     DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer(); 
     dmlc.setConnectionFactory(connectionFactory()); 
     dmlc.setDestination(new ActiveMQQueue("incoming.queue")); 
     dmlc.setMessageListener(new QueueProcessor()); 
     dmlc.setConcurrentConsumers(50); 
     return dmlc; 
    } 
} 



public class QueueProcessor implements MessageListener { 

    public void onMessage(Message message) { 

     // process the message 
    } 
} 

Другой отдел вносит изменения вверх по течению, так что сообщения будут распространяться среди трех различных очередей, названный «high_priority.queue», «med_priority.queue» и «low_priority.queue ». Количество одновременных потребителей для каждой очереди должно быть 50, 20 и 5 соответственно.

В моем коде тот же QueueProcessor будет отвечать за обработку сообщений, но я не уверен, как изменить мой конфиг, чтобы создать три прослушивателя сообщений вместо одного. Любые советы приветствуются.

+0

Правильная вещь, чтобы сделать, чтобы сохранить развязку было бы получить вверх по течению людей, использующих очереди для приоритетов в используйте встроенные приоритеты JMS, тогда вам не придется ничего менять. Нет причин для разделения очередей на основе приоритета. –

+0

Я согласен с вашим комментарием, но, к сожалению, я не контролирую происходящие изменения. –

ответ

2

Я решил эту проблему, просто создав несколько компонентов - не думаю, что это будет так легко:

@Bean 
public DefaultMessageListenerContainer highPriorityQueue() { 
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer(); 
    dmlc.setConnectionFactory(connectionFactory()); 
    dmlc.setDestination(new ActiveMQQueue("high_priority.queue")); 
    dmlc.setMessageListener(new QueueProcessor()); 
    dmlc.setConcurrentConsumers(50); 
    return dmlc; 
} 

@Bean 
public DefaultMessageListenerContainer medPriorityQueue() { 
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer(); 
    dmlc.setConnectionFactory(connectionFactory()); 
    dmlc.setDestination(new ActiveMQQueue("med_priority.queue")); 
    dmlc.setMessageListener(new QueueProcessor()); 
    dmlc.setConcurrentConsumers(20); 
    return dmlc; 
} 

@Bean 
public DefaultMessageListenerContainer lowPriorityQueue() { 
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer(); 
    dmlc.setConnectionFactory(connectionFactory()); 
    dmlc.setDestination(new ActiveMQQueue("low_priority.queue")); 
    dmlc.setMessageListener(new QueueProcessor()); 
    dmlc.setConcurrentConsumers(5); 
    return dmlc; 
} 
0

вы можете создать общий класс процессора и установить соединения, адресатов и другие конфигурации этого класса с помощью конструктора, указав имя очереди и подсчеты потребителей как входы кластеров процессоров, которые реализуют этот общий класс. также вы можете объявить конструкторы кладов как бобы для весеннего conf.

Смежные вопросы