2015-01-22 2 views
2

Я пытаюсь изучить MQTT и играть с ним. Я написал клиент для публикации и клиент для подписки (см. Ниже).Клиент подписки MQTT не получает сообщений после подключения

Если я запустил клиент подписки, а затем запустил клиент публикации (в то время как подписка запущена), тогда все работает нормально. Мой клиент подписки получает сообщения, опубликованные в теме правильно.

Однако, если я сначала запускаю клиента публикации (т. Е. Публикую сообщение в теме), а затем запускаю клиент подписки, я не получаю никаких сообщений.

Другими словами, если я сначала подключаюсь к суб-клиенту, а затем публикую сообщения с клиентом pub при подключении суб-клиента, все работает нормально. Однако, если я сначала опубликую сообщение, а затем подключитесь к моему суб-клиенту, я не получаю никаких сообщений. Мое понимание заключается в том, что я должен получать сообщения, которые присутствуют в теме, когда я подключаюсь к клиенту и подписаться на эту тему.

Я нашел то, что похоже на аналогичную проблему: Cannot receive already published messages to subscribed topic on mqtt paho, хотя этот случай немного отличается. Я попытался изменить различные настройки QoS или флаг ClearSession, но это не решило проблему.

Любая помощь будет оценена!

Публикация Клиент:

public class MQTT_Client_Pub implements MqttCallback{ 

MqttClient client; 

public static void main(String[] args) { 

    new MQTT_Client_Pub().mqttPub(); 
} 

public void mqttPub(){ 
    try { 
     this.setConnection(); 

     // Connect 
     client.connect(); 

     // Create new message 
     MqttMessage message = new MqttMessage(); 
     message.setPayload("A single test message from b112358".getBytes()); 
     message.setQos(0); 

     // Publish message to a topic 
     System.out.println("Publishing a message."); 
     client.publish("pahodemo/test/b112358", message); 

     // Disconnect 
     client.disconnect(); 

     } catch (MqttException e) { 
     e.printStackTrace(); 
     } catch (Exception e){ 
     e.printStackTrace(); 
     } 
} 

public void setConnection(){ 
    // Client 
    try{ 
     client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_pub"); 
    } catch (MqttException e) { 
     e.printStackTrace(); 
    } 

    // Connection Options 
    MqttConnectOptions options = new MqttConnectOptions(); 

    // Set the will 
    options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true); 

    // Set Callback 
    client.setCallback(this); 
} 

public void deliveryComplete(IMqttDeliveryToken token) { 
    System.out.println("Message delivered to the broker."); 
} 

public void messageArrived(String topic, MqttMessage message) throws Exception {} 

public void connectionLost(Throwable cause) {} 

}

Подписка Клиент:

public class MQTT_Client_Sub implements MqttCallback{ 

MqttClient client; 

public static void main(String[] args) { 

    new MQTT_Client_Sub().mqttSub(); 

} 

public void mqttSub(){ 
    try { 
     // Set connection 
     this.setConnection(); 

     // Connect 
     client.connect(); 

     // Subscribe 

     client.subscribe("pahodemo/test/b112358", 0); 
     // Disconnect 
     // client.disconnect(); 

     } catch (MqttException e) { 
     e.printStackTrace(); 
     } 
} 

public void setConnection(){ 
    try { 
     // Client 
     client = new MqttClient("tcp://iot.eclipse.org:1883", "mqtt_test_b112358_sub"); 
    } catch (MqttException e) { 
     e.printStackTrace(); 
    } 

    // Connection Options 
    MqttConnectOptions options = new MqttConnectOptions(); 
    options.setCleanSession(false); 

    // Set the will 
    options.setWill("pahodemo/clienterrors", "CRASHED - CONNECTION NOT CLOSED CLEANLY".getBytes(),2,true); 

    client.setCallback(this); 
} 

public void deliveryComplete(IMqttDeliveryToken token) {} 

public void messageArrived(String topic, MqttMessage message) throws Exception { 
    System.out.println("Message Arrived: " + message.getPayload() + " on tipic: " + topic.getBytes()); 
} 

public void connectionLost(Throwable cause) {} 

}

ответ

3

Сообщения, опубликованные до абонента подключается и выписывает будет поставляться только в следующих 2 ситуации

  1. Когда сообщения были опубликованы как сохраненные. Это означает, что последнее сообщение по этой теме будет доставлено новому абоненту в момент подписки. Это приведет только к последнему сообщению.

  2. Если клиент был ранее подключен и подписан, он был отключен. Затем публикуется сообщение, и клиент снова соединяется с cleansession = false. (И когда подписка на QOS1/2)

Это может помочь: http://www.thingsprime.com/?p=2897

+0

Просто добавить, для второго сценария, когда клиент парафировано подключается и выписывает, он должен также cleansession установлен в false тогда. – knolleary

+0

Спасибо за отзыв. Я все еще немного неясен. Для случая 1. Предположим, что я подключаюсь к клиенту издателя и публикую сообщение в теме с qos = 0. Затем я подключаюсь к клиенту подписчика и подписался на эту тему в первый раз, когда cleanSession установлен в true, если я получить сообщение, которое было опубликовано первым клиентом? Если я правильно понимаю, ответ должен быть да, но я не получаю опубликованное сообщение в моем субклиенте, когда я запускаю тест. – BD112358

+0

Кроме того, в отношении случая 2. Я также пробовал аналогичный сценарий, как описано. Я подключился к клиенту подписчика. Тогда я прекратил бы мой суб-клиент. Затем я опубликовал сообщение с qos 2 в теме с моим клиентом pub. Послесловие, я бы подключился к моему суб-клиенту с установкой cleanSession в false и подписался на эту тему с помощью qos 2.Если я понимаю пункт 2, я должен получить опубликованное сообщение в этом случае. Однако, когда я проверяю это, я не получаю никакого сообщения. Разве мое понимание? – BD112358