Именно в этом контексте:ActiveMQ + MQTT + подписаться на "ActiveMQ.Advisory.Connection"
приложение Java, присоединяется к теме "ActiveMQ.Advisory.Connection" от ActiveMQ 5.9.1 через MQTT (PAHO 0,4 .0):
public class SupervisorMqttClient implements MqttCallback {
private MqttClient client = null;
private MemoryPersistence persistence = null;
private MqttConnectOptions connOpts = null;
private final int STATUS_OK = 0;
private final int STATUS_ERROR = 1;
private String mqttServer = null;
private String clientId = null;
private int status = STATUS_OK;
public SupervisorMqttClient() {
try {
this.init();
} catch (MqttException e) {
Logger.error(e.getLocalizedMessage());
Logger.debug(e);
}
}
private void init() throws MqttException {
Properties props = PropertiesManager.getInstance("supervisor");
mqttServer = props.getProperty("supervisor.mqtt.server");
String supervisorID = props.getProperty("supervisor.mqtt.client.number");
clientId = Supervisor.APP_NAME+"-"+supervisorID;
connOpts = new MqttConnectOptions();
connOpts.setKeepAliveInterval(30);
connOpts.setCleanSession(true); // important non-durable
persistence = new MemoryPersistence();
client = new MqttClient(mqttServer, clientId, persistence);
connectAndSubscribe();
}
private void connectAndSubscribe() throws MqttSecurityException, MqttException {
try {
client.connect(connOpts);
client.setCallback(this);
client.subscribe("ActiveMQ/Advisory/Connection");
} catch (MqttSecurityException e) {
Logger.error(e.getLocalizedMessage());
Logger.debug(e);
} catch (MqttException e) {
Logger.error(e.getLocalizedMessage());
Logger.debug(e);
processError(e);
}
}
public void publish(String orderType, JSONObject jsonExtraData) {
if (status == STATUS_ERROR) {
connectAndSubscribe();
}
if (status == STATUS_OK) {
// some code here
}
}
@Override
public void connectionLost(Throwable err) {
Logger.info("Connection lost");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
Logger.info("deliveryComplete");
}
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
System.out.println("MQTT Mesage Arrived[" + topic + "] Msg[" + msg.toString() + "]");
}
private void processError(MqttException e) {
status = STATUS_ERROR;
try {
if (client.isConnected()) {
Logger.error("disconnecting");
client.disconnect();
}
} catch (MqttException ex) {
Logger.error(ex.getLocalizedMessage());
Logger.debug(ex);
}
}
}
Связь с ActiveMQ установлена в порядке. Эта тема предоставляет информацию о соединениях (открыть/закрыть) в ActiveMQ, но моя проблема заключается в том, что сообщения Ловля пустые:
MQTT электронного письмо Прибыл [ActiveMQ/Advisory/Connection] Msg []
Есть ли способ поймать их с помощью MQTT? или я должен использовать JMS для этого?
Thanks, Jon Ander.
Спасибо, Тим, ACK 200. Было бы неплохо, если бы ActiveMQ поделился информацией о соединениях с использованием стандартных форматов, таких как JSON. Все протоколы были бы признательны. –
Вы можете открыть запрос на повышение, чтобы объекты полезной нагрузки отправлялись как XML или JSON, подобно тому, как клиент STOMP делает что-то. Поскольку нет заголовков сообщений, нет способа указать тип контента, который ваш клиент должен поддерживать независимо от того, что отправлено по умолчанию. –