2016-02-27 1 views
4

Я использовал RxJava в андроиде с Retrofit 2, и я активировал subscribeOn (Schedulers.io()) android observOn (AndroidSchedulers.mainThread()) глобальный, прежде чем подписаться(). Однако иногда мне хотелось бы вызвать subscribeOn (Schedulers.immediate()) android observOn (Schedulers.immediate()), чтобы переопределить установленные планировщики, чтобы получить синхронизированный процесс. Но я нашел, что это не сработает, работы андроида будут по-прежнему обрабатываться в потоке io(), результат андроида обрабатывается mainThread(). Почему?RxJava subscribeOn и наблюдатьНе переопределять исходный планировщик раньше?

+0

Итак, все должно работать. Было бы лучше, если бы вы показали пример кода. – eleven

ответ

6

Это так, как работает RxJava.

Посмотрите на this video tutorial, начиная с отметки 12:50. Поэтому, учитывая пример в видео:

Observable.just(1, 2, 3) 
    .subscribeOn(Schedulers.newThread()) 
    .subscribeOn(Schedulers.io()) 
    .subscribe(System.out::println); 

Что происходит, что subscribeOn() гнезда всех звонков. В этом случае сначала генерируется subscribeOn(Schedulers.io()) и подписывается все, что находится над ним на потоке io. Но затем появляется subscribeOn(Schedulers.newThread()), и он занимает приоритет (так как он был назван последним), чтобы подписываться на него на нем. Не существует цепочки нитей. В этом примере вы, по сути, порождаете нить io без уважительной причины.

Чтобы лучше обрабатывать методы subscribeOn() и observeOn(), я предлагаю вам взглянуть на this post от того же автора видео. То, что он предлагает, чтобы использовать Transformer обернуть вызов этих методов:

Transformer на самом деле просто Func1<Observable<T>, Observable<R>>. В другими словами: подайте его Observable одного типа, и он вернет Observable другого. Это точно так же, как вызов серии операторов inline.

Таким образом, вы можете иметь метод следующим образом:

<T> Transformer<T, T> applySchedulers() { 
    return observable -> observable.subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()); 
} 

Или, если вы хотите использовать свои трансформаторы, вы можете иметь следующие настройки:

final Transformer schedulersTransformer = 
    observable -> observable.subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()); 

@SuppressWarnings("unchecked") 
<T> Transformer<T, T> applySchedulers() { 
    return (Transformer<T, T>) schedulersTransformer; 
} 

Тогда выше пример будет выглядеть так:

Observable.just(1, 2, 3) 
    .compose(applySchedulers()) 
    .subscribe(System.out::println); 

Надеюсь, что это поможет.

+1

Большое спасибо, ваш ответ действительно помогает мне. После чтения исходного кода. Я нашел Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread()). SubscribeOn (Schedulers.io()). Subscribe (System.out :: println); будет вилка 3 потоков, originalThread -> newThread -> ioThread. Итак, наконец, метод prinln будет вызываться в ioThread, который создается из newThread, а newThread создается из originalThread. – AtanL

+0

Кроме того, в дополнение к этому есть статья, которую написал Дэн Лью, которая охватывает это: http://blog.danlew.net/2015/03/02/dont-break-the-chain/ – w3bshark