2015-09-03 4 views
9

Я использую Retrofit и RxJava для выполнения некоторых фоновых задач. Код выглядит следующим образом:Как выполнить flatMap на фоновом потоке

public class MyLoader{ 
    public Observable<MyData> getMyData(){ 
     return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() { 
       @Override 
       public Observable<MyData> call(MyHelper myHelper) { 
        return queryData(myHelper); 
       } 
     }); 
    } 

    private Observable<MyData> queryData(MyHelper myHelper){ 
     ... 
    } 

    private Observable<MyHelper> setupHelper(){ 
    return Observable.create(new Observable.OnSubscribe<MyHelper>() { 
      @Override 
      public void call(final Subscriber<? super MyHelper> subscriber) { 
       try{ 
       MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data 
       subscriber.onNext(helper); 
       subscriber.onCompleted(); 
       }catch(RetrofitError e){ 
       subscriber.onError(e) 
       } 
      } 
    } 
    } 
} 

Это терпит неудачу с RetrofitError, из-за NetworkOnMainThread Исключение в этой строке:

MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data 

подписавшись на мой Observable:

myLoader.getMyData() 
       .subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(new Subscriber<MyData>() { 
        @Override 
        public void onCompleted() { 

        } 

        @Override 
        public void onError(Throwable e) { 

        } 

        @Override 
        public void onNext(MyData inventory) { 

        } 
       }); 

Согласно Rx документации flatMap Безразлично работайте с любым фоновым потоком. Мой вопрос заключается в том, как обеспечить, чтобы весь метод getMyData() выполнялся в фоновом режиме.

+2

На первый взгляд, я не вижу ничего плохого. Не могли бы вы попытаться поставить '.subscribeOn (Schedulers.io())' в 'setupHelper' вместо этого? – akarnokd

+0

Дооснащение уже интегрировано с RxJava и способно напрямую возвращать Observables, поэтому вам не нужно создавать собственные наблюдательные команды, выполняющие запросы. – BladeCoder

+0

Вы нашли другой подход, чем добавление '.subscribeOn()' во втором наблюдаемом? – Juancho

ответ

0

Есть хороший шанс, когда вы создаете объект MyLoader в основном потоке, который должен выполняться Observable.create (или, может быть, где-то еще в вашем коде (?)). Если это так, то .subscribeOn(Schedulers.io()) не повлияет на изменение потока.

Вы можете попробовать обернуть .create() с помощью .defer(), чтобы убедиться, что Observable создан только тогда, когда он подписан.

например. defer(() -> create(....))

3


Я просто добавить observeOn(Schedulers.newThread()) перед тем flatMap и это работает!

+0

Добро пожаловать на сайт! [Пожалуйста, используйте ответы исключительно для ответа на вопрос] (// meta.stackoverflow.com/q/92107). Если у вас есть другой вопрос, пожалуйста, спросите его, нажав кнопку [«Спросить вопрос» в верхней части страницы] (// stackoverflow.com/questions/ask). Если у вас есть такая же проблема, как и эта, когда у вас достаточно репутации, вы можете [перепроверить вопрос] (// stackoverflow.com/privileges/vote-up) или [добавить баунти на него] (// stackoverflow .com/помощь/Баунти). Вы также можете «зажечь» вопрос в качестве фаворита, и в этом случае система уведомит вас о любых новых ответах. – Mogsdad

+2

Почему мой ответ не принят? Я не вижу ничего плохого. Может ли кто-нибудь сказать мне, почему? – initialjie

+0

Я думаю, это связано с тем, что, вызывая функцию watchOn перед FlatMap, вы переключаете все на фоновые потоки, а не только на один шаг на конвейере. –

1

Это перемещает только один шаг в трубопроводе к фоновому потоку:

Observable<Integer> vals = Observable.range(1,10); 

vals.flatMap(val -> Observable.just(val) 
      .subscribeOn(Schedulers.computation()) 
      .map(i -> intenseCalculation(i)) 
).subscribe(val -> System.out.println(val)); 

Первоначально здесь ответа: https://stackoverflow.com/a/35429084/2908525