2016-10-30 2 views
1

У меня есть следующий код, который отлично работал в RxJava 1.6.Подписаться на PublishSubject в RxJava 2.0

private PublishSubject<Boolean> mConnectionSubject; 
... 
Observable.create(subscriber -> { 
     mConnectionSubject.subscribe(subscriber); 
     ... 
    }); 

Но после перехода на RxJava 2.0 этот код не компилируется. Причина в том, что Observable.create() теперь принимает в качестве аргумента ObservableEmitter. Но PublishSubject не принимает ObservableEmitter. Он принимает только интерфейс Consumer или Observer.

Получу любые предложения.

ответ

1

Я бы сделал что-то вроде ниже в упомянутом вами сценарии.

Observable.<Boolean>create(emitter -> mConnectionSubject.subscribe(
    emitter::onNext, 
    emitter::onError, 
    emitter::onComplete 
)); 
+1

Ваше предложение хорошее, также у меня есть ответ на эту проблему на GitHub https://github.com/ReactiveX/RxJava/issues/4787. –

0

Вам не нужно создавать новый наблюдаемыми, это должно работать:

private PublishSubject<Boolean> mConnectionSubject; 
... 
mConnectionSubject.doOnNext(...).observeOn(...).subscribe(); 

Вы можете цепь несколько операторов/подписки на PublishSubject.

+0

Это не ответит на вопрос, извините. Я не знаю, как использовать ObservableEmitter для будущих подписчиков. Поскольку теперь int RxJava 2.0 отличается от подхода к реализации, как это было раньше. –

Смежные вопросы