2015-02-04 7 views
2

Я пытаюсь создать приложение, которое подписывается на несколько тем mqtt, получать информацию, обрабатывать ее и формировать xmls, а при обработке запускать событие, чтобы они могли быть отправлены в какое-то облако сервер и успешный ответ оттуда для отправки обратно на канал mqtt.Spring integration MQTT опубликовано и подписаться на несколько тем

<int-mqtt:message-driven-channel-adapter 
    id="mqttAdapter" client-id="${clientId}" url="${brokerUrl}" topics="${topics}" 
    channel="startCase" auto-startup="true" /> 

<int:channel id="startCase" /> 

<int:service-activator id="startCaseService" 
     input-channel="startCase" ref="msgPollingService" method="pollMessages" /> 

    <bean id="mqttTaskExecutor" 
     class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="5" /> 
     <property name="maxPoolSize" value="10" /> 
    </bean> 

    <bean id="msgPollingService" class="com.xxxx.xxx.mqttclient.mqtt.MsgPollingService"> 
     <property name="taskExecutor" ref="mqttTaskExecutor" /> 
     <property name="vendorId" value="${vendorId}" /> 
    </bean> 

Мой вопрос, как я могу опубликовать это на несколько каналов, то есть если у меня есть возможность публиковать X сообщение для Y теме. В настоящее время у меня есть ниже:

<int:channel id="outbound" /> 

<int-mqtt:outbound-channel-adapter 
    id="mqtt-publish" client-id="kj" client-factory="clientFactory" 
    auto-startup="true" url="${brokerUrl}" default-qos="0" 
    default-retained="true" default-topic="${responseTopic}" channel="outbound" /> 

    <bean id="eventListner" class="com.xxxx.xxxx.mqttclient.event.EventListener"> 
     <property name="sccUrl" value="${url}" /> 
     <property name="restTemplate" ref="restTemplate" /> 
     <property name="channel" ref="outbound" /> 
    </bean> 

Я могу опубликовать это как:

channel.send(MessageBuilder.withPayload("customResponse").build()); 

Могу ли я сделать что-то вроде:

channel.send(Message<?>, topic) 

ответ

2

Ваша конфигурация выглядит хорошо. Однако MessageChannel является абстракцией для слабосвязанной связи и получает сделку только с Message.

Таким образом, вы запрашиваете a-la channel.send(Message<?>, topic) неверно для концепций обмена сообщениями.

Однако у нас есть трюк. Из AbstractMqttMessageHandler:

String topic = (String) message.getHeaders().get(MqttHeaders.TOPIC); 
..... 
this.publish(topic == null ? this.defaultTopic : topic, mqttMessage, message); 

Итак, что вам нужно от вашего кода заключается в следующем:

channel.send(MessageBuilder.withPayload("customResponse").setHeader(MqttHeaders.TOPIC, topic).build()); 

Другими словами, вы должны отправить Message с mqtt_topic заголовка для достижения динамической публикации от <int-mqtt:outbound-channel-adapter>.

С другой стороны мы не рекомендуем использовать MessageChannel s непосредственно из приложения. <gateway> с сервисным интерфейсом для такого случая для конечного приложения. Где это topic может быть одним из аргументов метода обслуживания, отмеченным как @Header(MqttHeaders.TOPIC)

+0

Спасибо, да, шлюз - это то, что я буду реализовывать, как кажется более подходящим. Кроме того, я могу показаться наивным, поскольку я только начал внедрять системы обмена сообщениями, правильно ли я определяю отдельные каналы для входящих и исходящих? –

+0

Да, это правда. Если вы не собираетесь переносить сообщения из одной темы и отправлять их другим, вам определенно нужно несколько каналов. –

Смежные вопросы