2016-06-13 3 views
0

Я хочу отобразить/преобразовать объект в другой объект в фоновом потоке и получить его в основном потоке, как только завершится один разговор.Карта RxJava и испускать, как только завершается одна карта

Observable.just(1,2,3,4,5) 
      .map(new Func1<Integer, String>() { 
       @Override 
       public String call(Integer integer) { 
        Log.d(TAG, "mapping number " + integer); 
        return String.valueOf(integer) + " mapped on: " + Thread.currentThread().getName(); 
       } 
      }) 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(new Observer<String>() { 
       @Override 
       public void onCompleted() { 
        Log.d(TAG, "onCompleted on: " + Thread.currentThread().getName()); 
       } 

       @Override 
       public void onError(Throwable e) { 
       } 

       @Override 
       public void onNext(String integer) { 
        Log.d(TAG, integer + " received on: "+ Thread.currentThread().getName()); 
       } 
      }); 

Результат является:

D: mapping number 1 
D: mapping number 2 
D: mapping number 3 
D: mapping number 4 
D: mapping number 5 
D: 1 mapped on: RxNewThreadScheduler-1 received on: main 
D: 2 mapped on: RxNewThreadScheduler-1 received on: main 
D: 3 mapped on: RxNewThreadScheduler-1 received on: main 
D: 4 mapped on: RxNewThreadScheduler-1 received on: main 
D: 5 mapped on: RxNewThreadScheduler-1 received on: main 
D: onCompleted on: main 

Однако преобразование может Потребовалось некоторое время, и я ожидаю, чтобы получить их, как только преобразование будет сделано.

D: mapping number 1 
D: 1 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 2 
D: 2 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 3 
D: 3 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 4 
D: 4 mapped on: RxNewThreadScheduler-1 received on: main 
D: mapping number 5 
D: 5 mapped on: RxNewThreadScheduler-1 received on: main 
D: onCompleted on: main 
+3

В чем ваш вопрос? RxJava будет так же, как и ваше описание. Журналы в вашем вопросе - это просто потому, что преобразование происходит слишком быстро. Добавьте 'Thread.sleep (1000)' в функцию карты, и вы увидите разные журналы. – zsxwing

+0

@zsxwing yep, который сработал. – Pedram

ответ

3

Там нет необходимости устанавливать глобальный размер буфера, просто использовать observeOn(Scheduler, int) перегрузка, где вы можете указать значение prefetch равным 1. Это будет запрашивать только следующее значение, если предыдущее значение было обработано.

0

Это связано с тем, что RxJava применяет противодавление к операторам в цепи, которую вы используете выше. Операторы нисходящего потока, такие как ObserveOn, запрашивают данные из восходящего потока кусками, а не отдельные элементы для повышения эффективности. Если вы установите размер буфера на один, это будет эффективно достичь того, что можно было бы ожидать с ценой эффективности:

-Drx.ring-buffer.size=1 

В частности, что было бы совершенно ужасно для выше по потоку, которые имеют дорогие спускоподъемных звонки.

EDIT:

Вы можете использовать почтовый индекс с BehaviorSubject для своего рода сериализации вниз и вверх выбросов потока:

BehaviorSubject<Void> signal = BehaviorSubject.create(); 
signal.onNext(null); // <- pair up the signal with the first item immediately 
Observable.just(1,2,3,4,5) 
     .zipWith(signal, (item,v)->item) //only emit a next item when there is a "receipt acknowledgement" from the down stream 
     .observeOn(Schedulers.newThread()) //<- needed to avoid fetching subsequent items in UI thread 
     .map(new Func1<Integer, String>() { 
      @Override 
      public String call(Integer integer) { 
       Log.d(TAG, "mapping number " + integer); 
       return String.valueOf(integer) + " mapped on: " + Thread.currentThread().getName(); 
      } 
     }) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(new Observer<String>() { 
      @Override 
      public void onCompleted() { 
       Log.d(TAG, "onCompleted on: " + Thread.currentThread().getName()); 
      } 

      @Override 
      public void onError(Throwable e) { 
      } 

      @Override 
      public void onNext(String integer) { 
       Log.d(TAG, integer + " received on: "+ Thread.currentThread().getName()); 
       signal.onNext(null); //<- acknowledge receipt - allow emitting next item from upstream 
      } 
     }); 
Смежные вопросы