2016-10-29 6 views
1

Я ищу способ создания Наблюдаемый после обработки результата в subscribe.RxJava как создать Наблюдаемый из подписки

Учитывая я это Наблюдаемые из productRepo.list(), который дооснащения возвращение Observable<Response<ProductResponse>>.

productRepo 
    .list() 
    .retry(3) 
    .subscribe { response -> 
     if (response.isSuccessful) { 
      response.body().apply { 
       cache.saveProducts(data) 
      } 
     } 
    } 

Целью этого является сохранение результата в локальную БД cache. Этот плюс еще один очень похожий вызов заполняет локальную БД удаленными данными из API.

После завершения двух вызовов я хотел загрузить данные из cache.

Я не хочу сочетать оба наблюдаемых в любом случае. Просто захотите выполнить некоторую задачу позже.

Я хочу эту обработку как единицу в графе вызова Rx, чтобы он одновременно вызывал Call1 и Call2 и один раз Call1 и Call2 завершили запуск Task3. Каков наилучший способ в этом сценарии? Я действительно предпочитаю, если абонент для каждого звонка будет разделен.

flatMap лучший вариант здесь?

ответ

0

Как уже упоминалось,

Я действительно предпочитаю, если абонент для каждого вызова отделено.

Предположим, мы имеем два наблюдаемыми

val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) 

val call2 = Observable.from(arrayOf(2,4,6,8)) 

Если мы просто использовать Observable.zip следующим образом, он может только имеет одного подписчика для обоего два call1 & вызова 2.

Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Если мы используем три отдельных абонентов, следующие за вызов 2 поток call1 & будет срабатывать дважды.

call1.subscribe(call1Subscriber) 

call2.subscribe(call2Subscriber) 

Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber) 

Поэтому мы должны использовать .share().cacheWithInitialCapacity(1) делать трюки

val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) 
    .share() 
    .cacheWithInitialCapacity(1) 

val call2 = Observable.from(arrayOf(2,4,6,8)) 
    .share() 
    .cacheWithInitialCapacity(1) 

val task3Signal = Observable.zip(call1,call2){ c1, c2 -> 
    c1 + c2 
} 
call1.subscribe(call1Subscriber) 
call2.subscribe(call2Subscriber) 
task3Signal.subscribe(task3Subscriber) 

Вы также можете доказать/протестировать концепцию графа Rx от простого теста.

class SimpleJUnitTest { 

    @Test 
    fun test(){ 

    val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8)) 
     .doOnNext { println("call1 doOnNext $it") } 
     .share() 
     .cacheWithInitialCapacity(1) 

    val call2 = Observable.from(arrayOf(2,4,6,8)) 
     .doOnNext { println("call2 doOnNext $it") } 
     .share() 
     .cacheWithInitialCapacity(1) 

    val task3Signal = Observable.zip(call1,call2){ c1, c2 -> 
     println("task3Signal c1:$c1, c2: $c2") 
     c1 + c2 
    } 

    val testSubscriber1 = TestSubscriber<Int>() 
    val testSubscriber2 = TestSubscriber<Int>() 
    val testSubscriber3 = TestSubscriber<Int>() 
    call1.subscribe(testSubscriber1) 
    call2.subscribe(testSubscriber2) 
    task3Signal.subscribe(testSubscriber3) 

    testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8)) 
    testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8)) 
    testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12)) 
    testSubscriber1.assertValueCount(8) 
    testSubscriber2.assertValueCount(4) 
    testSubscriber3.assertValueCount(4) 


    } 
} 

Выход:

call1 doOnNext 1 
call1 doOnNext 2 
call1 doOnNext 3 
call1 doOnNext 4 
call1 doOnNext 5 
call1 doOnNext 6 
call1 doOnNext 7 
call1 doOnNext 8 
call2 doOnNext 2 
call2 doOnNext 4 
call2 doOnNext 6 
call2 doOnNext 8 
task3Signal c1:1, c2: 2 
task3Signal c1:2, c2: 4 
task3Signal c1:3, c2: 6 
task3Signal c1:4, c2: 8 
+0

Спасибо за подробный пример. Приветствую вашу помощь. Я закончил использование doOnNext для кеширования, а затем застегнул оба наблюдаемых в один. Это делает метод подписи многословным, но он будет делать. – RobGThai

1

Посмотрите на Zip. Есть ли что-то подобное, Observable.zip (firstObservable, secondObservable, ..... {Задача 3}

+0

Zip бы объединить их в единый поток, который не совсем то, что я ищу. – RobGThai

+0

Я ошибся с Zip. Я тоже использовал его. Я перепутал с 'merge'. Виноват! – RobGThai

3
.doOnNext() 

ваш ответ, потому что будет возвращать ваш окончательный ответ или каждый ответ, если есть несколько. Есть попытки.

Смежные вопросы