2016-12-05 1 views
0

Я искал для всего Интернета. Я пробовал установить чистку «ложь» и qos 1 и 2, но абонент не получает весь контент, когда он приходит в Интернет. Пожалуйста, помогите ... мой кодКак получить все данные, отправленные производителем, когда абонент неактивен в теме Mqtt Java (не сохранено или последнее сообщение)

Example.java (продюсер)

public class Example extends PersonBean { 
public void hey(){ 
String clientId = MqttClient.generateClientId(); 
MemoryPersistence persistence = new MemoryPersistence(); 

    PersonBean pb=new PersonBean(); 
    for(int i=1;i<=5;i++){ 
     Gson gson = new Gson(); 

     Date dt=new Date(); 
     DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
     String currentTime = df.format(dt); 
    pb.setId(i); 
    pb.setName("sai"); 
    pb.setEmail("[email protected]"); 
    pb.setAddress("hyderabad"); 
    pb.setCreatedOn(currentTime); 


    String jsonInString = gson.toJson(pb); 


     try { 
      String broker = "tcp://localhost:1883"; 
      String topicName = "test/mqtt"; 
      int qos = 2; 

    MqttClient mqttClient = new MqttClient(broker,clientId); 
      MqttConnectOptions connOpts = new MqttConnectOptions(); 
      connOpts.setCleanSession(false); 
      mqttClient.connect(connOpts); 

      MqttMessage message = new MqttMessage(jsonInString.getBytes()); 

      message.setQos(qos); 
    message.setRetained(true); 


      MqttTopic topic2 = mqttClient.getTopic(topicName); 
topic2.publish(message); 

     mqttClient.disconnect(); 
     } catch (MqttException me) { 
      System.out.println("reason " + me.getReasonCode() + " - msg " 
        + me.getMessage() + "- loc " + me.getLocalizedMessage() 
        + " - cause " + me.getCause() + "- exception " + me); 

     } 


    }} 

    public static void main(String[] args) { 
    Example ex=new Example(); 
    ex.hey(); 
}} 

и мой

Subscriber.java

public class SubcriberExample implements MqttCallback{ 

MqttClient client; 
public void doDemo() { 
    try { 
     client = new MqttClient("tcp://192.168.4.189:1883", "Sending"); 
     client.connect(); 
     client.setCallback(this); 
     client.subscribe("test/mqtt"); 



    } catch (MqttException e) { 
     e.printStackTrace(); 
    } 
} 

public static void main(String args[]){ 
    SubcriberExample se=new SubcriberExample(); 
    se.doDemo(); 
} 

@Override 
public void connectionLost(Throwable arg0) { 
    // TODO Auto-generated method stub 
    System.out.println("connection lost...."); 
} 

@Override 
public void deliveryComplete(IMqttDeliveryToken arg0) { 
    // TODO Auto-generated method stub 

} 

@Override 
public void messageArrived(String topic, MqttMessage message) throws Exception { 
    // TODO Auto-generated method stub 
    System.out.println("message is : "+message); 
}} 

ответ

3

Вы находитесь на правильный трек, опубликованные сообщения должны быть QoS1/2, чтобы иметь право на проведение в автономной очереди (для автономного абонента).

Однако, из кода выше, кажется, проблема в подписчике. Чтобы абонент MQTT мог получать офлайн-сообщения, он должен иметь постоянный сеанс. То есть абоненту необходимо установить соединение с чистым сеансом = false.

+0

спасибо за ответ .. но его не работает. Я добавил client = new MqttClient («tcp: //192.168.4.189: 1883», «Отправка»); \t MqttConnectOptions mqOptions = new MqttConnectOptions(); \t mqOptions.setCleanSession (false); \t client.connect (mqOptions); \t client.subscribe ("test/mqtt"); \t client.setCallback (this); – Sai

+0

Я думаю, что подписчику также необходимо подписаться с QoS 1/2, иначе QoS будет понижен до QoS 0 (что, я думаю, я по умолчанию с клиентом), и сообщения не будут сохранены. –

+0

У вас есть небольшой пример в Java? – Sai

Смежные вопросы