2017-02-10 3 views
1

Я пытаюсь использовать службу обмена сообщениями для отправки сообщений другим службам при создании пользователя в нашей системе. Службы, получающие сообщения, представляют собой весовые весовые приложения Spring Boot (в нашем случае - контейнеры в среде Kubernetes).Как я могу использовать ActiveMQ и Spring Integration JMS для приема сообщений один раз в нескольких группах?

Desired behavior, one recipient per group

Я хотел бы:

  1. Каждый стручок в каждой службы, сконфигурированный для приема сообщений.

  2. Один (и только один) блок в каждой службе фактически получает каждое сообщение.

  3. Сообщение может быть получено несколькими службами таким образом.

Таким образом, на картинке сообщение «Новый пользователь» принимается одним модулем в службе уведомлений и одним модулем в службе ведения журнала.

Я настроил ActiveMQ и Spring Integration для отправки и получения сообщений и получил его работу как a) очередь (один получатель), и b) Тема (кто подписали ее на получение). Проблема заключается в следующем:

a) В очереди одна получатель означает, что уведомления будут получать ее, но регистрация не будет (или наоборот).

Queue, one recipient

б) С темой, кто бы ни подписался означает, что все шесть стручков получит его.

Topic, every subscriber receives

Я чувствую, что я хочу, это группировка, как «один получатель типа УВЕДОМЛЕНИЯ и один получатель типа ЖУРНАЛИРОВАНИЯ», но я не знаю, как осуществить это. Похоже, что это возможно с помощью шаблона маршрутизации сообщений, но мне было интересно, можно ли полностью его использовать с помощью Spring Integration.

Некоторые коды. Конфигурация для отправителя:

import org.apache.activemq.spring.ActiveMQConnectionFactory; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.jms.core.JmsTemplate; 

@Configuration 
public class MessagingConfig {  
    @Bean 
    public ActiveMQConnectionFactory connectionFactory(){ 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
     connectionFactory.setBrokerURL("tcp://myactivemq:61616"); 
     return connectionFactory; 
    } 

    @Bean 
    public JmsTemplate jmsTemplate(){ 
     JmsTemplate template = new JmsTemplate(); 
     template.setConnectionFactory(connectionFactory()); 
     template.setDefaultDestinationName("user-dest"); 
     return template; 
    }  
} 

сервис, который посылает сообщение (упрощенный):

@Service 
public class MessageSender { 
    private final JmsTemplate jmsTemplate; 

    @Autowired 
    public MessageSender(JmsTemplate jmsTemplate) { 
    this.jmsTemplate = jmsTemplate; 
    } 

    public void sendMessage(String userId) {  
    jmsTemplate.send(new MessageCreator() { 
     @Override 
     public Message createMessage(Session session) throws JMSException{ 
     return session.createTextMessage("NEW USER:" + userId); 
     } 
    }); 
    } 
} 

Конфигурация приемника:

import org.apache.activemq.spring.ActiveMQConnectionFactory; 
import org.springframework.context.annotation.Bean; 
import org.springframework.jms.annotation.EnableJms; 
import org.springframework.jms.config.DefaultJmsListenerContainerFactory; 

@Configuration 
@EnableJms 
public class ReceiverConfig { 

    private ActiveMQConnectionFactory getConnectionFactory() { 
     ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
     connectionFactory.setBrokerURL("tcp://myactivemq:61616"); 
     return connectionFactory; 
    } 

    @Bean 
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { 
     DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(getConnectionFactory()); 
     factory.setConcurrency("1-1"); 
     return factory; 
    } 
} 

и службой приема сообщений.

import javax.jms.JMSException; 
import javax.jms.TextMessage; 
import org.springframework.jms.annotation.JmsListener; 

@Service 
public class MessageReceiver { 

    @JmsListener(destination = "user-dest") 
    public void receiveMessage(final TextMessage message) throws JMSException { 
    // Do something with message.getText() 
    } 
} 

Это работает, но действует как очередь, только один получатель. Любая идея, как это может быть получено одним модулем Notification Service и одним модулем службы ведения журнала?

+1

Попробуйте следующее: http://activemq.apache.org/virtual-destinations.html –

ответ

0

FYI. Я закончил использование RabbitMQ, возможно, потому, что нашел документацию намного проще.С RabbitMQ, решение было довольно просто:

  • Отправить сообщение для обмена
  • Создание очереди и связать очередь для обмена
    • Я использовал пользовательский интерфейс управления, но привязка может быть в коде тоже.
    • Из моего примера выше я создал две очереди: одну для уведомления и одну для ведения журнала.
  • Службы уведомления и регистрации слушают их соответствующие очереди.

Таким образом, служба пользователя отправляет сообщение на свой Exchange, обе очереди забирают его, и только один модуль каждой службы прослушивания получает сообщение. Именно то, что я искал.

Смежные вопросы