В приведенном ниже примере кода я пытаюсь извлечь из удаленного источника 1 и удаленного источника2 - для обоих из них требуется вход с удаленного источника. Я хотел бы получить из source0 только один раз и извлечь из source1 & source2 одновременно.Как делиться RxJava параллельно наблюдаемым?
public class TestReactiveX5 {
public static void main(String[] args) throws Exception {
Scheduler scheduler = Schedulers.io();
RemoteSourceData<Integer, Integer> source0 = new RemoteSource0();
RemoteSourceData<Integer, String> source1 = new RemoteSource1();
RemoteSourceData<Integer, String> source2 = new RemoteSource2();
Observable<Integer> i = Observable.just(1);
Observable<Integer> i0 = i.map(input -> source0.getData(input)).share();
Observable<String> fooSource = i0.map(input0 -> source1.getData(input0)).subscribeOn(scheduler);
Observable<String> barSource = i0.map(input0 -> source2.getData(input0)).subscribeOn(scheduler);
Observable<List<String>> merged = Observable.merge(fooSource, barSource).toList();
merged.subscribe(val -> System.out.println(val));
Thread.sleep(15000);
}
@FunctionalInterface
private static interface RemoteSourceData<IN, OUT> {
public OUT getData(IN input);
}
public static class RemoteSource0 implements RemoteSourceData<Integer, Integer> {
@Override
public Integer getData(Integer input) {
try {
System.out.println("fetch from remote Source0 " + new Timestamp(System.currentTimeMillis()));
Thread.sleep(2000L);
} catch (InterruptedException ex) {
// ignore
}
return 0;
}
}
private static class RemoteSource1 implements RemoteSourceData<Integer, String> {
@Override
public String getData(Integer i) {
try {
System.out.println("fetch from remote Source1 " + new Timestamp(System.currentTimeMillis()));
Thread.sleep(4000L);
} catch (InterruptedException ex) {
// ignore
}
return "foo";
}
}
private static class RemoteSource2 implements RemoteSourceData<Integer, String> {
@Override
public String getData(Integer i) {
try {
System.out.println("fetch from remote Source2 " + new Timestamp(System.currentTimeMillis()));
Thread.sleep(4000L);
} catch (InterruptedException ex) {
// ignore
}
return "bar";
}
}
}
Выход на это показывает, что Source1 и Source2 не выбираются параллельно:
fetch from remote Source0 2016-06-03 16:00:38.802
fetch from remote Source2 2016-06-03 16:00:40.803
fetch from remote Source1 2016-06-03 16:00:44.804
[bar, foo]
Если удалить часть(), а затем Source1 и Source2 извлекаются параллельно, но требует двух вызовов в Source0:
fetch from remote Source0 2016-06-03 16:05:48.027
fetch from remote Source0 2016-06-03 16:05:48.027
fetch from remote Source1 2016-06-03 16:05:50.028
fetch from remote Source2 2016-06-03 16:05:50.028
[foo, bar]
Как я могу получить один вызов Source0 и параллельных вызовов Source1 & source2?
Используйте .cache() вместо .share(). –