2015-10-27 3 views
14

Я использую RxAndroid для потоков. В моем реальном случае использования я получаю список с сервера (используя Retrofit). Я использую планировщики для работы над фоновым потоком и получаю окончательную эмиссию в потоке Android UI (основной).Процесс Наблюдаемый на фоновом потоке

Это хорошо работает для сетевого вызова, однако я понял, что мои операторы после сетевого вызова не используют фоновый поток, а получают вызов в основном потоке.

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .subscribe(integer1 -> {}); 

Как я могу убедиться, что все операции выполняются на фоновом потоке?

ответ

39

TL; DR: move observeOn(AndroidSchedulers.mainThread())filter(...).


subscribeOn(...) используется для обозначения на котором Навинчивайте Observable будет начать работает на. Последующие вызовы subscribeOn будут проигнорированы.

Таким образом, если вы должны были написать следующее, все будет выполнен на Schedulers.newThread():

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .subscribe(integer1 -> { doSomething(integer1); }); 

Сейчас, конечно, это не то, что вы хотите: вы хотите doSomething в главном потоке ,
Здесь находится observeOn. Все действия послеobserveOn выполнены на этом планировщике. Поэтому в вашем примере в основном потоке выполняется filter.

Вместо двигаться observeOn вниз как раз перед subscribe:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(integer1 -> { doSomething(integer1) }); 

Теперь filter будет происходить на «новом потоке», и doSomething в основном потоке.


пойти еще дальше, вы можете использовать observeOn несколько раз:

myService.fetchSomeIntegersFromServer() 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(Schedulers.computation()) 
     .filter(integer -> { 
      System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread()); 
      return true; 
     }) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(integer1 -> { doSomething(integer1) }); 

В этом случае выборка будет происходить в новом потоке, фильтрацию на вычислительном потоке и doSomething на основной поток.

Оформить заказ ReactiveX - SubscribeOn operator для официальной документации.

+0

Спасибо за подробный ответ! Есть ли способ определить планировщики в начале цепочки вызовов метода (например, в OP)? – WonderCsabo

+0

Наблюдатели следуют [Декоратору] (https://en.wikipedia.org/wiki/Decorator_pattern), поэтому по дизайну это невозможно. 'observOn' (и другие методы) не возвращают тот же экземпляр' Observable', что и шаблон компоновщика, но вместо этого возвращают _new_ 'Observable' 'wrapping' 'old' 'Observable'. – nhaarman

+0

Я понимаю это. Это несчастливо для случая, потому что это означает, что я должен добавить вызов 'observOn()' ко всем потокам, которые я пишу? :( – WonderCsabo

Смежные вопросы