2017-02-06 5 views
2

Я довольно новичок в RxJava и, как и многие другие, пытаюсь разобраться с обработкой исключений. Я прочитал довольно много сообщений в Интернете (например, здесь обсуждаются how to handle exceptions thrown by observer's onNext) и думаю, что я получаю основную идею концепций.RxJava 2.0 - обработка ressources для uncaught subsciber error в публикации() refCount()

В вышеупомянутом обсуждении, один из плакатов гласит, что, когда исключение в подписчика, RxJava выполняет следующие действия:

Реализовать родовую обработку для входа отказ и прекратить передачу его события (любого вида) и очистить любые ресурсы из-за этого абонента и нести с любыми оставшимися подписками.

Это также более или менее то, что я вижу, единственное, с чем я столкнулся, это бит «очистить любой ressources». Чтобы сделать это ясно, давайте предположим следующий пример:

Я хочу создать наблюдаемое, которое прослушивает источник событий async (например, очередь JMS) и onNext() s для каждого полученного сообщения. Таким образом, в (псевдо-) код, который я бы сделать что-то похожее на это:

Observable<String> observable = Observable.create(s -> { 
    createConnectionToBroker(); 
    getConsumer().setMessageListener(message -> s.onNext(transform(message))); 
    s.setDisposable(new Disposable() { 
    public void dispose() { 
     tearDownBrokerConnection(); 
    } 
    }); 
}); 

Так как я хочу, чтобы повторно использовать сообщение слушателя для многих абонентов/наблюдателей, я не сразу подписаться на созданный Observable, но использовать вместо команды publish(). refCount(). Нечто похожее на это:

Observable<String> observableToSubscribeTo = observable.publish().refCount(); 

Disposable d1 = observableToSubscribeTo.subscribe(s -> ...); 
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...); 

Это все работает как ожидалось. Код подключается к JMS только при установке первой подписки, и соединение с брокером закрывается, когда последний наблюдатель dispose() d.

Однако, когда абонент выбрасывает исключение, когда он onNext() ed, кажется, что он запутался. Как и ожидалось, наблюдатель, который бросил, был уничтожен, и всякий раз, когда публикуется новое событие, он больше не будет уведомлен. Моя проблема заключается в том, что, когда все остальные подписчики имеют dispose() d, Observable, поддерживающий соединение с брокером сообщений, больше не уведомляется. Мне кажется, что абонент, который выбрал исключение, находится в каком-то состоянии зомби. Он игнорируется, когда дело доходит до распределения событий, но оно каким-то образом предотвращает получение уведомления об обнаружении корня Observable, когда последний абонент dispose() d.

Я понимаю, что RxJava ожидает, что наблюдатели обязательно не бросят, а обработают возможное исключение должным образом. К сожалению, в случае, когда я хочу предоставить библиотеку, которая возвращает наблюдаемый вызывающему, я не могу контролировать своих подписчиков вообще. Это означает, что я никогда не смогу защитить свою библиотеку от глупых наблюдателей.

Итак, я спрашиваю себя: я что-то упустил? Неужели нет возможности правильно очищать, когда абонент бросает? Является ли это ошибкой или я просто не понимаю библиотеку?

Любые идеи очень ценятся!

ответ

1

Если бы вы могли показать некоторые модульные тесты, которые демонстрируют проблему (без необходимости JMS), это было бы здорово.

Кроме того, onNext в RxJava 2 никогда не должен бросать; если это так, это неопределенное поведение.Если вы не доверяете своим потребителям, вы можете иметь конечный наблюдаемым трансформатор, который делает safeSubscribe вместо простого subscribe, добавляющий защиты от воздействия плохо себя вниз по течению:

.compose(o -> v -> o.safeSubscribe(v)) 

или

.compose(new ObservableTransformer<T>() { 
    @Override public Observable<T> apply(final Observable<T> source) { 
     return new Observable<T>() { 
      @Override public void subscribeActual(Observer<? super T> observer) { 
       source.safeSubscribe(observer); 
      } 
     }; 
    } 
}) 
+1

привет @akarnokd, спасибо за изучение этого! Я просто написал базовый модульный тест, чтобы продемонстрировать поведение, просто чтобы узнать, что я больше не могу его воспроизвести?!?! : -o Довольно смущающе, я должен сказать :-(Так что я могу просто предположить, что проблема была где-то в моем коде клея. Извиняюсь за то, что тратила ваше время !!! И спасибо за комментарии относительно safeSubscribe(). библиотеке, которой вы никогда не доверяете используемому коду, я бы очень хотел ее ожесточить. Один неверный абонент не должен иметь никаких побочных эффектов для других хорошо себя работающих. –

Смежные вопросы