Я пытаюсь подключиться к брокеру MQTT. Я хочу повторить попытку, если я не смогу подключиться. Я получаю обратный вызов успеха или отказа подключения.RxJava2: невозможно обработать исключение для асинхронного обратного вызова с использованием retryWhen
После прочтения нескольких примеров retryПри обработке асинхронных обратных вызовов я собрал этот код. Он отлично работает, если мне удастся подключиться. Кроме того, он повторяет три раза, если я позвоню e.onError(throwable)
синхронно с Flowable
. Но это сбой моего приложения для Android, если я вызываю e.onError(throwable)
из метода onFailure()
callback.
Вот код:
RxJava цепь
createConnectionFlowable(client, options)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(createRetryFunction())
.subscribe(createConsumer());
создать текучий
private Flowable<String> createConnectionFlowable(final MqttAndroidClient client, final MqttConnectOptions options) {
return Flowable.create(new FlowableOnSubscribe<String>() {
public void subscribe(final FlowableEmitter<String> e) throws Exception {
client.connect(options).setActionCallback(new IMqttActionListener() {
public void onSuccess(IMqttToken iMqttToken) { e.onComplete(); }
public void onFailure(IMqttToken iMqttToken, Throwable throwable) { e.onError(throwable); }
});
}
}, BackpressureStrategy.BUFFER);
}
создать функцию Retry
private Function<Flowable<Throwable>, Publisher<?>> createRetryFunction() {
return new Function<Flowable<Throwable>, Publisher<?>>() {
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.zipWith(
Flowable.range(1, 3),
new BiFunction<Throwable, Integer, Integer>() {
public Integer apply(Throwable throwable, Integer integer) throws Exception { return integer; }
}
)
.flatMap(new Function<Integer, Publisher<?>>() {
public Publisher<?> apply(Integer integer) throws Exception {
return Flowable.timer(integer, TimeUnit.SECONDS);
}
});
}
};
}
Потребитель: сделать все хорошие вещи здесь
private Consumer<String> createConsumer() {
return new Consumer<String>() {
public void accept(String s) throws Exception {
Log.d(TAG, "accept: do important stuff here" + s);
}
};
}
журналы ошибок
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply() called with: throwable = [Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)], integer = [1]
12-20 11:51:08.544 16769-16769/com.work.app D/MqttBridgeService: apply: delay retry by seconds:1
12-20 11:51:09.589 16769-16830/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.600 16769-16831/com.work.app D/AlarmPingSender: Unregister alarmreceiver to MqttServicepaho837944119000
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app D/MqttBridgeService: onFailure: connection unsuccessful
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.lang.Thread.run(Thread.java:818)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:234)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connectErrno(IoBridge.java:171)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.connect(IoBridge.java:122)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:183)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:452)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at java.net.Socket.connect(Socket.java:884)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 2 more
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: Caused by: android.system.ErrnoException: isConnected failed: ECONNREFUSED (Connection refused)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: at libcore.io.IoBridge.isConnected(IoBridge.java:223)
12-20 11:51:09.606 16769-16769/com.work.app W/System.err: ... 8 more
12-20 11:51:09.606 16769-16769/com.work.app E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.work.app, PID: 16769
Unable to connect to server (32103) - java.net.ConnectException: failed to connect to /10.31.252.211 (port 1883) after 30000ms:
Вопросы
- Почему этот код сгенерирует исключение, которое происходит сбой приложения? В идеале, он должен обрабатывать исключение? Что мне здесь не хватает?
- Почему он не повторяет попытку 3 раза?
- Почему такой же код повторяет попытку, если я звоню
e.onError(throwable)
синхронно из методаFlowable.subscribe()
?
Ссылки
Я попытался с '' Observer' реализации OnError() 'в качестве абонента, за исключением также. Кроме того, причина, по которой я чувствую, что это не повторная попытка, заключается в том, что она не печатает 'apply: delay retry by seconds: 1' три раза, как если бы это исключение было синхронно выбрано из' Flowable'. Синхронно, я имею в виду не использовать обратный вызов чтобы проверить, могу ли я подключиться к брокеру MQTT. –
Я думаю, что следующая последовательность событий: «onSubscribe -> onConnect (1) -> onError (1) -> timer (1) -> onSubscribe -> onConnect (2) -> onError (1) -> onError (2)». В основном исходный обратный вызов остается активным и вызывается, когда второе соединение выходит из строя, в результате чего 3 обработчика передаются обработчику ошибок. Не используя обратный вызов, что повторение событий не может произойти. – Kiskae
Интересно .. Я изменил задержку каждого повтора с 1 сек до 15 секунд, чтобы избежать совпадения и имитировать что-то более близкое к синхронному вызову. У меня по-прежнему такая же ошибка. Любая идея, как мы можем справиться с такими неопределенными сценариями, где мы не знаем, сколько времени потребуется для обратного вызова? –