2015-07-15 4 views
2

Я пытаюсь создать клиент андроид mqtt в режиме данных настойчивости. Клиент подключается нормально и отправляет данные при активном сетевом подключении. Но если сеть опускается, я не думаю, что сообщения, созданные в течение автономного периода, отправляются на сервер. я пробовал с QOS 1 и QOS 2.android mqtt data persistance, когда офлайн

Должен ли я поймать сообщение, которое не отправлено, и сохранить их в SQLite и повторить попытку, когда сеть снова встанет?

MqttConnection.java

public class MqttConnection implements MqttCallback { 

     private final static String TAG = MqttConnection.class.getName(); 
     private static MqttConnection instance; 
     private MqttAndroidClient client; 
     private Context context; 
     private String mDeviceId; 
     private volatile boolean isConnecting = false; 
     private MqttDefaultFilePersistence mDataStore = null; // Defaults to FileStore 
     private MemoryPersistence mMemStore; // On Fail reverts to MemoryStore 

     private MqttConnection(Context context) { 
      this.context = context; 
      this.client = null; 
     } 

     public static MqttConnection getInstance(Context context) { 
      Log.d(TAG, ".getInstance() entered"); 
      if (instance == null) { 
       instance = new MqttConnection(context); 
      } 
      return instance; 
     } 

     public void connect() { 
      if (client != null) { 
       if (isConnecting) { 
        Log.d(TAG, "Mqtt is connecting"); 
       } 
       client = null; 
      } 
      mDeviceId = "Someid" 
      setConnectingState(true); 
      try { 
       mDataStore = new MqttDefaultFilePersistence(context.getCacheDir().getAbsolutePath()); 
       mMemStore = new MemoryPersistence(); 
       // Construct the MqttClient instance 
       if (mDataStore != null) { 
        client = new MqttAndroidClient(context,MQTT_CONNECTIONURI, mDeviceId, mDataStore); 
       } else { 
        client = new MqttAndroidClient(context,MQTT_CONNECTIONURI, mDeviceId, mMemStore); 
       } 
       client.setCallback(this); 
       MqttActionListener actionLister = new MqttActionListener(context, Constants.ActionStateStatus.CONNECTING); 
       MqttConnectOptions options = new MqttConnectOptions(); 
       options.setCleanSession(false); 
       options.setKeepAliveInterval(1200); 
       client.connect(options, context, actionLister); 
      } catch (MqttException e) { 
       Log.e(TAG, "Exception caught while attempting to connect to server", e.getCause()); 
      } 
     } 


     public void disconnect() { 
      Log.d(TAG, "Disconnected"); 
     } 

     public void subscribe() { 
      Log.d(TAG, "subscribe"); 
     } 

     public void unsubscribe() { 
      Log.d(TAG, "unsubscribe"); 
     } 

     public void publish(String message) { 
    //  Log.d(TAG, ".publish() entered"); 
      String topic = mDeviceId; 
      // check if client is connected 
      if (isMqttConnected()) { 
       // create a new MqttMessage from the message string 
       MqttMessage mqttMsg = new MqttMessage(message.getBytes()); 
       // set retained flag 
       mqttMsg.setRetained(true); 
       // set quality of service 
       mqttMsg.setQos(1); 
       try { 
        // create ActionListener to handle message published results 
        MqttActionListener listener = new MqttActionListener(context, Constants.ActionStateStatus.PUBLISH); 
    //    Log.d(TAG, ".publish() - Publishing " + message + " to: " + topic + ", with QoS: " + qos + " with retained flag set to " + retained); 
        client.publish(topic, mqttMsg, context, listener); 
       } catch (MqttPersistenceException e) { 
        Log.e(TAG, "MqttPersistenceException caught while attempting to publish a message", e.getCause()); 
       } catch (MqttException e) { 
        Log.e(TAG, "MqttException caught while attempting to publish a message", e.getCause()); 
       } 
      } else { 
       connectionLost(null); 
      } 
     } 

     @Override 
     public void connectionLost(Throwable throwable) { 
      setConnectingState(false); 
      Log.d(TAG, "connectionLost"); 
      if (isMqttConnected()) { 
       Log.d(TAG, "Mqtt is connected"); 
      } else { 
       Log.d(TAG, "Mqtt conenction is lost"); 
       reconnect(); 
      } 
     } 

     private void reconnect() { 
      if (isConnecting) { 
       Log.d(TAG, "Mqtt is reconnecting.. hang on"); 
       return; 
      } 
      if (isOnline()) { 
       Log.d(TAG, "We are online so should restart connection"); 
       connect(); 
       return; 
      } else { 
**Log.d(TAG, "we are offline.. should i store the data in SQLite and resend them ???");** 
       return; 
      } 
     } 

     @Override 
     public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
      Log.d(TAG, "messageArrived"); 
     } 

     @Override 
     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
      Log.d(TAG, "deliveryComplete"); 
     } 

     private boolean isMqttConnected() { 
    //  Log.d(TAG, ".isMqttConnected() entered"); 
      boolean connected = false; 
      try { 
       if ((client != null) && (client.isConnected())) { 
        connected = true; 
       } 
      } catch (Exception e) { 
       // swallowing the exception as it means the client is not connected 
      } 
    //  Log.d(TAG, ".isMqttConnected() - returning " + connected); 
      return connected; 
     } 

     synchronized void setConnectingState(boolean isConnecting) { 
      this.isConnecting = isConnecting; 
     } 

     private boolean isOnline() { 
      ConnectivityManager cm = 
        (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE); 
      NetworkInfo netInfo = cm.getActiveNetworkInfo(); 
      return netInfo != null && netInfo.isConnectedOrConnecting(); 
     } 
    } 

ответ

2

В первую очередь в соответствии с этим https://stackoverflow.com/a/31826706/4615587 только сообщения, которые уже на лету получить только сохранялось в QoS 1/2 режимах. И это имеет смысл, поскольку он должен работать с DeliveryTokens, чтобы убедиться, что QoS поддерживается.

Таким образом, вам понадобится реализовать собственный автономный буфер. Я сделал это в одном из моих проектов, используя постоянную очередь заданий https://github.com/path/android-priority-jobqueue.

-Nowa

+0

Большое спасибо, отличная библиотека и хорошо работает ..! – Linus

+0

Paho Android Service 1.1.0 теперь доступен и поддерживает автономную буферизацию и автоматическое повторное подключение. –

Смежные вопросы