Я довольно новичок в RxJava и, как и многие другие, пытаюсь разобраться с обработкой исключений. Я прочитал довольно много сообщений в Интернете (например, здесь обсуждаются how to handle exceptions thrown by observer's onNext) и думаю, что я получаю основную идею концепций.RxJava 2.0 - обработка ressources для uncaught subsciber error в публикации() refCount()
В вышеупомянутом обсуждении, один из плакатов гласит, что, когда исключение в подписчика, RxJava выполняет следующие действия:
Реализовать родовую обработку для входа отказ и прекратить передачу его события (любого вида) и очистить любые ресурсы из-за этого абонента и нести с любыми оставшимися подписками.
Это также более или менее то, что я вижу, единственное, с чем я столкнулся, это бит «очистить любой ressources». Чтобы сделать это ясно, давайте предположим следующий пример:
Я хочу создать наблюдаемое, которое прослушивает источник событий async (например, очередь JMS) и onNext() s для каждого полученного сообщения. Таким образом, в (псевдо-) код, который я бы сделать что-то похожее на это:
Observable<String> observable = Observable.create(s -> {
createConnectionToBroker();
getConsumer().setMessageListener(message -> s.onNext(transform(message)));
s.setDisposable(new Disposable() {
public void dispose() {
tearDownBrokerConnection();
}
});
});
Так как я хочу, чтобы повторно использовать сообщение слушателя для многих абонентов/наблюдателей, я не сразу подписаться на созданный Observable, но использовать вместо команды publish(). refCount(). Нечто похожее на это:
Observable<String> observableToSubscribeTo = observable.publish().refCount();
Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);
Это все работает как ожидалось. Код подключается к JMS только при установке первой подписки, и соединение с брокером закрывается, когда последний наблюдатель dispose()
d.
Однако, когда абонент выбрасывает исключение, когда он onNext()
ed, кажется, что он запутался. Как и ожидалось, наблюдатель, который бросил, был уничтожен, и всякий раз, когда публикуется новое событие, он больше не будет уведомлен. Моя проблема заключается в том, что, когда все остальные подписчики имеют dispose()
d, Observable, поддерживающий соединение с брокером сообщений, больше не уведомляется. Мне кажется, что абонент, который выбрал исключение, находится в каком-то состоянии зомби. Он игнорируется, когда дело доходит до распределения событий, но оно каким-то образом предотвращает получение уведомления об обнаружении корня Observable, когда последний абонент dispose()
d.
Я понимаю, что RxJava ожидает, что наблюдатели обязательно не бросят, а обработают возможное исключение должным образом. К сожалению, в случае, когда я хочу предоставить библиотеку, которая возвращает наблюдаемый вызывающему, я не могу контролировать своих подписчиков вообще. Это означает, что я никогда не смогу защитить свою библиотеку от глупых наблюдателей.
Итак, я спрашиваю себя: я что-то упустил? Неужели нет возможности правильно очищать, когда абонент бросает? Является ли это ошибкой или я просто не понимаю библиотеку?
Любые идеи очень ценятся!
привет @akarnokd, спасибо за изучение этого! Я просто написал базовый модульный тест, чтобы продемонстрировать поведение, просто чтобы узнать, что я больше не могу его воспроизвести?!?! : -o Довольно смущающе, я должен сказать :-(Так что я могу просто предположить, что проблема была где-то в моем коде клея. Извиняюсь за то, что тратила ваше время !!! И спасибо за комментарии относительно safeSubscribe(). библиотеке, которой вы никогда не доверяете используемому коду, я бы очень хотел ее ожесточить. Один неверный абонент не должен иметь никаких побочных эффектов для других хорошо себя работающих. –