2016-12-20 1 views
3

Я пытаюсь подключиться к брокеру MQTT. Я хочу повторить попытку, если я не смогу подключиться. Я получаю обратный вызов успеха или отказа подключения.RxJava2: невозможно обработать исключение для асинхронного обратного вызова с использованием retryWhen

После прочтения нескольких примеров retryПри обработке асинхронных обратных вызовов я собрал этот код. Он отлично работает, если мне удастся подключиться. Кроме того, он повторяет три раза, если я позвоню e.onError(throwable) синхронно с Flowable. Но это сбой моего приложения для Android, если я вызываю e.onError(throwable) из метода onFailure() callback.

Вот код:

RxJava цепь

createConnectionFlowable(client, options) 
    .subscribeOn(Schedulers.io()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .retryWhen(createRetryFunction()) 
    .subscribe(createConsumer()); 

создать текучий

private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) { 
    return Flowable.create(new FlowableOnSubscribe<String>() { 

     public void subscribe(final FlowableEmitter<String> e) throws Exception { 
       client.connect(options).setActionCallback(new IMqttActionListener() { 
        public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); } 
        public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); } 
       }); 
     } 
    }, BackpressureStrategy.BUFFER); 
} 

создать функцию Retry

private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() { 
    return new Function<Flowable<Throwable>, Publisher<?>>() { 

     public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception { 
      return throwableFlowable.zipWith(
        Flowable.range(1, 3), 
        new BiFunction<Throwable, Integer, Integer>() { 
         public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; } 
        } 
      ) 
      .flatMap(new Function<Integer, Publisher<?>>() { 
       public Publisher<?> apply(Integer integer) throws Exception { 
        return Flowable.timer(integer, TimeUnit.SECONDS); 
       } 
      }); 
     } 
    }; 
} 

Потребитель: сделать все хорошие вещи здесь

private Consumer<String> createConsumer() { 
    return new Consumer<String>() { 
     public void accept(String s) throws Exception { 
      Log.d(TAG, "accept: do important stuff here" + s); 
     } 
    }; 
} 

журналы ошибок

12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful 
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1] 
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1 
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000 
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000 
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful 
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at java.lang.Thread.run(Thread.java:818) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at libcore.io.IoBridge.isConnected(IoBridge.java:234) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at libcore.io.IoBridge.connectErrno(IoBridge.java:171) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at libcore.io.IoBridge.connect(IoBridge.java:122) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at java.net.Socket.connect(Socket.java:884) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 2 more 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err:  at libcore.io.IoBridge.isConnected(IoBridge.java:223) 
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 8 more 
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main 
                    Process: com.work.app, PID: 16769 
                    Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: 

Вопросы

  1. Почему этот код сгенерирует исключение, которое происходит сбой приложения? В идеале, он должен обрабатывать исключение? Что мне здесь не хватает?
  2. Почему он не повторяет попытку 3 раза?
  3. Почему такой же код повторяет попытку, если я звоню e.onError(throwable) синхронно из метода Flowable.subscribe()?

Ссылки

  1. RxJava 1.x retryWhen doc
  2. This blog

ответ

0

Я, наконец, получил эту работу!

Оказывается, что это не проблема с RxJava2, но на пути MQTT (Затмение ПОЗ) запускает обратный вызов IMqttActionListener на главном потоке, даже если клиент был создан на другом потоке !!!.

Простым решением является дождаться, когда клиент подключится к потоку, на котором он был создан. Код разделяет в этом вопросе является правильным для этого метода

@NonNull 
public Flowable<Boolean> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) { 
    return Flowable.create(new FlowableOnSubscribe<Boolean>() { 
     @Override 
     public void subscribe(final FlowableEmitter<Boolean> e) throws Exception { 
      IMqttToken connect = client.connect(options); 
      connect.waitForCompletion(); //this is blocking and is what was required!! 
      if (client.isConnected()) { 
       e.onNext(true); 
       e.onComplete(); 
      } else { 
       e.onError(connect.getException()); 
      } 

     } 
    }, BackpressureStrategy.BUFFER); 
} 

Надежда это помогает кто-то работает с этими библиотеками :)

0
  1. Поскольку вы subscribe с помощью Consumer<String> вы не определить обработчик ошибок потока. Это означает, что ошибка будет передана обработчику ошибок по умолчанию через RxJavaPlugins.getErrorHandler().handleError(...). На андроиде этот обработчик, похоже, вызывает фатальную ошибку. Чтобы исправить это, используйте Observer<String> вместо Consumer<String>
  2. Журнал, кажется, предлагает клиенту сбой 3 раза (упоминается трижды «onFailure») за пределами Rx ничего не делает.Если бы мне пришлось угадать, что клиент может быть сдержанным, то это означает, что после первоначального подключения к последующему вызову client.connect(...) проявляется некоторая форма странного поведения, вызывающего проблему. Поскольку журнал показывает error - 1 sec wait - error, error, я думаю, что обратные вызовы остаются активными, поэтому второй сбой отправляется дважды в RxJava.
  3. Предполагая, что вы говорите о методе waitForCompletion(), когда вы говорите об синхронном, это будет поддерживать мои предположения в 2. Поскольку никаких обратных вызовов не регистрируются, каждый бросок будет только сообщаться один раз, фиксируя поведение.

Я не уверен, почему эмиттер остался бы функциональным после его завершения (onError/onComplete), но поскольку спецификация указывает, что эти методы вызываются только после того, как это может быть неуказанное поведение, вызывающее эту проблему.

+0

Я попытался с '' Observer' реализации OnError() 'в качестве абонента, за исключением также. Кроме того, причина, по которой я чувствую, что это не повторная попытка, заключается в том, что она не печатает 'apply: delay retry by seconds: 1' три раза, как если бы это исключение было синхронно выбрано из' Flowable'. Синхронно, я имею в виду не использовать обратный вызов чтобы проверить, могу ли я подключиться к брокеру MQTT. –

+0

Я думаю, что следующая последовательность событий: «onSubscribe -> onConnect (1) -> onError (1) -> timer (1) -> onSubscribe -> onConnect (2) -> onError (1) -> onError (2)». В основном исходный обратный вызов остается активным и вызывается, когда второе соединение выходит из строя, в результате чего 3 обработчика передаются обработчику ошибок. Не используя обратный вызов, что повторение событий не может произойти. – Kiskae

+0

Интересно .. Я изменил задержку каждого повтора с 1 сек до 15 секунд, чтобы избежать совпадения и имитировать что-то более близкое к синхронному вызову. У меня по-прежнему такая же ошибка. Любая идея, как мы можем справиться с такими неопределенными сценариями, где мы не знаем, сколько времени потребуется для обратного вызова? –

Смежные вопросы