Как уже упоминалось,
Я действительно предпочитаю, если абонент для каждого вызова отделено.
Предположим, мы имеем два наблюдаемыми
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
val call2 = Observable.from(arrayOf(2,4,6,8))
Если мы просто использовать Observable.zip
следующим образом, он может только имеет одного подписчика для обоего два call1 & вызова 2.
Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
Если мы используем три отдельных абонентов, следующие за вызов 2 поток call1 & будет срабатывать дважды.
call1.subscribe(call1Subscriber)
call2.subscribe(call2Subscriber)
Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
Поэтому мы должны использовать .share().cacheWithInitialCapacity(1)
делать трюки
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
.share()
.cacheWithInitialCapacity(1)
val call2 = Observable.from(arrayOf(2,4,6,8))
.share()
.cacheWithInitialCapacity(1)
val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
c1 + c2
}
call1.subscribe(call1Subscriber)
call2.subscribe(call2Subscriber)
task3Signal.subscribe(task3Subscriber)
Вы также можете доказать/протестировать концепцию графа Rx от простого теста.
class SimpleJUnitTest {
@Test
fun test(){
val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
.doOnNext { println("call1 doOnNext $it") }
.share()
.cacheWithInitialCapacity(1)
val call2 = Observable.from(arrayOf(2,4,6,8))
.doOnNext { println("call2 doOnNext $it") }
.share()
.cacheWithInitialCapacity(1)
val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
println("task3Signal c1:$c1, c2: $c2")
c1 + c2
}
val testSubscriber1 = TestSubscriber<Int>()
val testSubscriber2 = TestSubscriber<Int>()
val testSubscriber3 = TestSubscriber<Int>()
call1.subscribe(testSubscriber1)
call2.subscribe(testSubscriber2)
task3Signal.subscribe(testSubscriber3)
testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8))
testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8))
testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12))
testSubscriber1.assertValueCount(8)
testSubscriber2.assertValueCount(4)
testSubscriber3.assertValueCount(4)
}
}
Выход:
call1 doOnNext 1
call1 doOnNext 2
call1 doOnNext 3
call1 doOnNext 4
call1 doOnNext 5
call1 doOnNext 6
call1 doOnNext 7
call1 doOnNext 8
call2 doOnNext 2
call2 doOnNext 4
call2 doOnNext 6
call2 doOnNext 8
task3Signal c1:1, c2: 2
task3Signal c1:2, c2: 4
task3Signal c1:3, c2: 6
task3Signal c1:4, c2: 8
Спасибо за подробный пример. Приветствую вашу помощь. Я закончил использование doOnNext для кеширования, а затем застегнул оба наблюдаемых в один. Это делает метод подписи многословным, но он будет делать. – RobGThai