TL; DR: move observeOn(AndroidSchedulers.mainThread())
filter(...)
.
subscribeOn(...)
используется для обозначения на котором Навинчивайте Observable
будет начать работает на. Последующие вызовы subscribeOn
будут проигнорированы.
Таким образом, если вы должны были написать следующее, все будет выполнен на Schedulers.newThread()
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
Сейчас, конечно, это не то, что вы хотите: вы хотите doSomething
в главном потоке ,
Здесь находится observeOn
. Все действия послеobserveOn
выполнены на этом планировщике. Поэтому в вашем примере в основном потоке выполняется filter
.
Вместо двигаться observeOn
вниз как раз перед subscribe
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
Теперь filter
будет происходить на «новом потоке», и doSomething
в основном потоке.
пойти еще дальше, вы можете использовать observeOn
несколько раз:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
В этом случае выборка будет происходить в новом потоке, фильтрацию на вычислительном потоке и doSomething
на основной поток.
Оформить заказ ReactiveX - SubscribeOn operator для официальной документации.
Спасибо за подробный ответ! Есть ли способ определить планировщики в начале цепочки вызовов метода (например, в OP)? – WonderCsabo
Наблюдатели следуют [Декоратору] (https://en.wikipedia.org/wiki/Decorator_pattern), поэтому по дизайну это невозможно. 'observOn' (и другие методы) не возвращают тот же экземпляр' Observable', что и шаблон компоновщика, но вместо этого возвращают _new_ 'Observable' 'wrapping' 'old' 'Observable'. – nhaarman
Я понимаю это. Это несчастливо для случая, потому что это означает, что я должен добавить вызов 'observOn()' ко всем потокам, которые я пишу? :( – WonderCsabo