У меня есть следующий класс, который я держу в одноточечном:RxJava Тема переизлучит некорректный планировщик
public class SessionStore {
Subject<Session, Session> subject;
public SessionStore() {
subject = new SerializedSubject<>(BehaviorSubject.create(new Session());
}
public void set(Session session) {
subject.onNext(session);
}
public Observable<UserSession> observe() {
return subject.distinctUntilChanged();
}
}
В деятельности наблюдает сессию и выполнять работу сети при каждом изменении:
private Subscription init() {
return sessionStore
.observe()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1<Session, Observable<Object>>() {
@Override
public Observable<Object> call(Session session) {
return retrofitService.getAThing();
}
})
.subscribe(...);
}
Когда Я подписываюсь на хранилище сеансов, субъект испускает на io()
сразу же, как это BehaviourSubject
, а абонент выполняет по mainThread()
.
Проблема возникает, когда я звоню sessionStore.set(new AnotherSession())
, но уже подписался на нее. IMO, это должно выполнить поток, определенный в init()
в планировщике io()
. Однако вместо этого происходит то, что поток выполняется в том же потоке, на который был вызван subject.onNext()
. Результатом является NetworkOnMainThreadException
, поскольку я выполняю сетевую операцию в flatMap()
.
Насколько я понял предметы? Я неправильно их использую? Какое правильное решение, пожалуйста?
Я также попытался заменить весь предметный подход на Observable.fromEmitter()
в методе observe()
, но удивительно, что выход был тем же самым.
Спасибо, я не думал об определении планировщика для Observable, созданного внутри flatMap. Я думал, что если я определяю планировщик после плоской карты, он будет применяться ко всему потоку. – bakua
Ну, это зависит от того, чего вы пытаетесь достичь. Вы могли бы использовать функцию watchOn перед FlatMap. Каждое значение вниз по цепочке было бы перемещено от точки наблюденияOn от потока X к другому. Но вы, указав subscribeOn на наблюдаемый в flatMap, вы достигли параллелизма. Таким образом, существует разница между onSubscribe/onObserve. OnObserve может размещаться несколько раз в одном наблюдаемом, чтобы переместить выбросы в другой поток. Для subscribeOn должен быть только один для наблюдаемого, потому что будет использоваться subscribeOn, который является самым верхним. –
Принимая это во внимание, я тогда не понимаю, почему поток ведет себя по-разному, когда я удаляю 'subscribeOn()' изнутри 'flatMap()' и помещаю его на один шаг раньше. Подобно 'sessionStore.observe(). SubscribeOn(). FlatMap() ...' – bakua