2016-04-25 2 views
5

У меня есть несколько сетевых вызовов в моем приложении. Я хотел, чтобы запустить сетевой запрос в потоке ввода-вывода с помощью оператора составляет с этого трансформатора:Применение планировщиков дважды в наблюдаемой цепочке (используя compose)

public static <T> Transformer<T, T> runOnIoThread() 
{ 
    return tObservable -> tObservable.subscribeOn(Schedulers.io()) 
     .observeOn(AndroidSchedulers.mainThread()); 
} 

Это, кажется, работает хорошо, пока у меня есть только один вызов сети. Однако, если я их свяжу, как в следующем примере, я получаю AndroidInterInMainThreadException Android.

public Observable<String> networkCall1() 
{ 
    return <NETWORK_CALL_1()> 
      .compose(runOnIoThread()); 
} 

public Observable<String> networkCall2(String input) 
{ 
    return <NETWORK_CALL_2(input)> 
      .compose(runOnIoThread()); 
} 

public Observable<String> chainedCalls() 
{ 
    return networkCall1() 
      .flatMap(result1 -> networkCall2(result1)); 
} 

Моя идея до того, что compose прикладывается к полной наблюдаемой цепи перед вызовом, и что позже compose вызовы будут «перезаписать» поведение предыдущего. Но на самом деле это выглядит так: observeOn вызов первого compose (observeOn основной поток) доминирует над вторым compose call (поток IO). Одним из очевидных решений было бы иметь две версии networkCall1 - один, который применяет планировщики, а другой - нет. Однако это сделает мой код довольно многословным.

Вы знаете лучшие решения? Можете ли вы объяснить поведение применения планировщиков дважды (с составлением) в наблюдаемой цепочке?

Редактировать: Я использую retrofit с RxJava для своих сетевых вызовов.

ответ

7

Вы можете использовать только subscribeOn() один раз в потоке. Если вы используете его во второй раз, он ничего не сделает. Таким образом, когда вы выстроив цепочку из двух методов вместе запуска:

observeOn(AndroidSchedulers.mainThread())

, который переключает операцию к основному потоку. После этого он остается там, потому что следующий subscribeOn() фактически игнорируется.

Я бы предположил, что вы на самом деле слишком сложны с помощью своего метода компоновки. Просто добавьте

subscribeOn(Schedulers.io())

Для обоих сетевых вызовов, а затем использовать

observeOn(AndroidSchedulers.mainThread())

Просто прежде чем вы хотите, чтобы обработать результаты на главном потоке. Вы бы в конечном итоге с чем-то вроде:

public Observable<String> networkCall1() 
{ 
    return <NETWORK_CALL_1()> 
      .subscribeOn(Schedulers.io); 
} 

public Observable<String> networkCall2(String input) 
{ 
    return <NETWORK_CALL_2(input)> 
      .subscribeOn(Schedulers.io); 
} 

public Observable<String> chainedCalls() 
{ 
    return networkCall1() 
      .flatMap(result1 -> networkCall2(result1)) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

EDIT

Если вы действительно хотите иметь observeOn() вызов методов отдельных сетевых вызовов вы можете. Вам нужно добавить дополнительный observeOn() к вашему методу chainedCalls(). Вы можете иметь столько звонков observeOn(), сколько хотите. Это было бы примерно так:

public Observable<String> networkCall1() 
{ 
    return <NETWORK_CALL_1()> 
      .subscribeOn(Schedulers.io) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

public Observable<String> networkCall2(String input) 
{ 
    return <NETWORK_CALL_2(input)> 
      .subscribeOn(Schedulers.io) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 

public Observable<String> chainedCalls() 
{ 
    return networkCall1() 
      .observeOn(Schedulers.io) 
      .flatMap(result1 -> networkCall2(result1)) 
      .observeOn(AndroidSchedulers.mainThread()); 
} 
+0

Hi Jahnold! Спасибо! Ваше объяснение помогло мне лучше понять подписку, наблюдать и сочинять. Поскольку мои методы будут частью библиотеки, я хотел бы сделать звонки как можно более легкими (т. Е. Экономя вызов watchOn). ДжекWharton написал [здесь] (http: // stackoverflow.com/a/21010181/2011622) об обновлении с RxJava, чтобы он выполнял вызовы в потоке ввода/вывода и наблюдал, что он находится в потоке вызывающего абонента. Однако цепочка между двумя вызовами по-прежнему возможна. Значит, должен быть способ сделать это, верно? –

+0

Я обновил ответ, чтобы показать, как вы можете использовать дополнительный «watchOn», если вы действительно хотите, чтобы все они были частью вызовов. – Jahnold

+1

Спасибо за сотрудничество! Поэтому, в то время как 'subscribeOn' может использоваться только один раз в потоке,' observOn' может использоваться несколько раз для переключения на разные потоки. Я не мог найти эту информацию в документации. Как ты это узнал? У вас есть ссылка? –

Смежные вопросы