2015-07-13 3 views
1

Если я использую QOS, тип 1 означает, что брокер будет продолжать отправлять сообщение подписчику, пока оно не получит подтверждение. Как я могу установить или вернуть ack? Пожалуйста, кто-нибудь пролил свет на это.Подтверждение MQTT

Это мой исходный код:

import java.io.BufferedWriter; 
import java.io.File; 
import java.io.FileWriter; 
import java.io.IOException; 
import java.io.PrintWriter; 
import java.net.URISyntaxException; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.Properties; 
import java.util.Vector; 

import org.fusesource.hawtbuf.Buffer; 
import org.fusesource.hawtbuf.UTF8Buffer; 
import org.fusesource.mqtt.client.Callback; 
import org.fusesource.mqtt.client.CallbackConnection; 
import org.fusesource.mqtt.client.Listener; 
import org.fusesource.mqtt.client.MQTT; 
import org.fusesource.mqtt.client.QoS; 
import org.fusesource.mqtt.client.Topic; 

import com.adventnet.management.log.Log; 
import com.adventnet.nms.util.NmsLogMgr; 
public class DefaultMqttListener implements IMqttListener,Runnable{ 

    long count = 0; 
    long start = System.currentTimeMillis(); 
    private HashMap serverDetailsHash; 
    public DefaultMqttListener(HashMap serverProp) 
    { 
     this.serverDetailsHash = serverProp; 
    } 
    CallbackConnection myconnection; 
    @Override 
    public void init() { 
     MQTT mqtt = new MQTT(); 
     String user = env("APOLLO_USER", (String)serverDetailsHash.get("userName")); //No I18N 
     String password = env("APOLLO_PASSWORD", (String)serverDetailsHash.get("password")); //No I18N 
     String host = env("APOLLO_HOST", (String)serverDetailsHash.get("hostName")); //No I18N 
     int port = Integer.parseInt(env("APOLLO_PORT", (String)serverDetailsHash.get("port"))); 
     try { 
      mqtt.setHost(host, port); 
      mqtt.setUserName(user); 
      mqtt.setPassword(password); 
      final CallbackConnection connection = mqtt.callbackConnection(); 
      myconnection = connection; 
      connection.listener(new org.fusesource.mqtt.client.Listener() { 
       public void onConnected() { 
       } 
       public void onDisconnected() { 
       } 
       public void onFailure(Throwable value) { 
        value.printStackTrace(); 
        System.exit(-2); 
       } 
       public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) { 
         long time = System.currentTimeMillis(); 
         callback(topic, msg, ack,connection,time); 
       } 
      }); 
      connection.connect(new Callback<Void>() { 
       @Override 
       public void onSuccess(Void value) { 
        NmsLogMgr.M2MERR.log("MQTT Listener connected in ::::", Log.SUMMARY); 
        ArrayList getTopics = (ArrayList)serverDetailsHash.get("Topics"); 
        for(int i=0;i<getTopics.size();i++) 
        { 
         HashMap getTopic = (HashMap)getTopics.get(i); 
         String topicName = (String) getTopic.get("topicName"); 
         String qosType = (String) getTopic.get("qosType"); 
         Topic[] topic = {new Topic(topicName, getQosType(qosType))}; 
         connection.subscribe(topic, new Callback<byte[]>() { 
          public void onSuccess(byte[] qoses) { 
          } 
          public void onFailure(Throwable value) { 
           value.printStackTrace(); 
           System.exit(-2); 
          } 
         }); 
        } 
        //Topic[] topics = {new Topic("adminTest", QoS.AT_LEAST_ONCE),new Topic("adminTest1", QoS.AT_LEAST_ONCE)}; 
       } 
       @Override 
       public void onFailure(Throwable value) { 
        value.printStackTrace(); 
        System.exit(-2); 
       } 
      }); 

      // Wait forever.. 
      synchronized (Listener.class) { 
       while(true){ 
        Listener.class.wait();} 

      } 
     } catch (URISyntaxException e1) { 
      // TODO Auto-generated catch block 
      e1.printStackTrace(); 
     } 
     catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    private static String env(String key, String defaultValue) { 
     String rc = System.getenv(key); 
     if(rc== null){ 
      return defaultValue;} 
     return rc; 
    } 

    @Override 
    public void callback(UTF8Buffer topic, Buffer msg, Runnable ack, CallbackConnection connection, long time) { 
     // TODO Auto-generated method stub 
     try { 
      String Message = msg.utf8().toString(); 
      MQTTMessage mqttMsg = new MQTTMessage(); 
      mqttMsg.setMQTTMessage(Message); 
      mqttMsg.setTime(time); 
      mqttMsg.setTopic(topic); 
      mqttMsg.sethostName((String) serverDetailsHash.get("hostName")); 
      MQTTCacheManager.mgr.addToCache(mqttMsg); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 
      NmsLogMgr.M2MERR.log("myconnection closed", Log.SUMMARY); 
      myconnection.disconnect(new Callback<Void>() { 
      @Override 
      public void onSuccess(Void value) { 
       System.exit(0); 
      } 
      @Override 
      public void onFailure(Throwable value) { 
       value.printStackTrace(); 
       System.exit(-2); 
      } 
     }); 

    } 

    @Override 
    public void run() { 
     this.init(); 
     // TODO Auto-generated method stub 
    } 
    public QoS getQosType(String name) 
    { 
     Properties qosContainer = new Properties(); 
     qosContainer.put("0", QoS.AT_MOST_ONCE); 
     qosContainer.put("1", QoS.AT_LEAST_ONCE); 
     qosContainer.put("2", QoS.EXACTLY_ONCE); 
     QoS qosName = (QoS) qosContainer.get(name); 
     return qosName; 
    } 
} 

ответ

1

Вы не отправить подтверждение в коде вообще, все это должно быть обработано с помощью библиотеки MQTT вы используете.

Пакеты QOS ack находятся между издателем и брокером, а затем отдельно между брокером и любыми подписчиками.

+0

Но если я использую QOS1 (ATLEAST_ONCE) или QOS2 (EXACTLY_ONCE), я продолжаю получать одинаковые сообщения каждые 10 секунд. Это повторяется до тех пор, пока я не отключу соединение Broker-Client. Даже после повторного подключения сообщение поступает от брокера каждые 10 секунд. –

+0

ОК, тогда это звучит как брокер или библиотека, которую вы используете, сломана или не поддерживает ничего, кроме QOS 0. – hardillb

+0

Один из моих коллег предлагает этот метод. public void onPublish (тема UTF8Buffer, Buffer msg, Runnable ack) { long time = System.currentTimeMillis(); callback (тема, msg, ack, соединение, время); ack.run(); Это правильно ??? –

0

Я не использовал библиотеку Java, но вам нужно подписаться на тему, определяющую уровень QoS 1 (чтобы иметь хотя бы одну доставку) или уровень QoS 2 (чтобы иметь ровно один раз доставки). В этих случаях базовая библиотека отправляет пакеты ACK брокеру.

Paolo.

Смежные вопросы