2016-06-03 2 views
1

В приведенном ниже примере кода я пытаюсь извлечь из удаленного источника 1 и удаленного источника2 - для обоих из них требуется вход с удаленного источника. Я хотел бы получить из source0 только один раз и извлечь из source1 & source2 одновременно.Как делиться RxJava параллельно наблюдаемым?

public class TestReactiveX5 { 
    public static void main(String[] args) throws Exception { 
    Scheduler scheduler = Schedulers.io(); 
    RemoteSourceData<Integer, Integer> source0 = new RemoteSource0(); 
    RemoteSourceData<Integer, String> source1 = new RemoteSource1(); 
    RemoteSourceData<Integer, String> source2 = new RemoteSource2(); 

    Observable<Integer> i = Observable.just(1); 
    Observable<Integer> i0 = i.map(input -> source0.getData(input)).share(); 
    Observable<String> fooSource = i0.map(input0 -> source1.getData(input0)).subscribeOn(scheduler); 
    Observable<String> barSource = i0.map(input0 -> source2.getData(input0)).subscribeOn(scheduler); 

    Observable<List<String>> merged = Observable.merge(fooSource, barSource).toList(); 
    merged.subscribe(val -> System.out.println(val)); 

    Thread.sleep(15000); 
    } 

    @FunctionalInterface 
    private static interface RemoteSourceData<IN, OUT> { 
    public OUT getData(IN input); 
    } 

    public static class RemoteSource0 implements RemoteSourceData<Integer, Integer> { 
    @Override 
    public Integer getData(Integer input) { 
     try { 
     System.out.println("fetch from remote Source0 " + new Timestamp(System.currentTimeMillis())); 
     Thread.sleep(2000L); 
     } catch (InterruptedException ex) { 
     // ignore 
     } 
     return 0; 
    } 
    } 

    private static class RemoteSource1 implements RemoteSourceData<Integer, String> { 
    @Override 
    public String getData(Integer i) { 
     try { 
     System.out.println("fetch from remote Source1 " + new Timestamp(System.currentTimeMillis())); 
     Thread.sleep(4000L); 
     } catch (InterruptedException ex) { 
     // ignore 
     } 
     return "foo"; 
    } 
    } 

    private static class RemoteSource2 implements RemoteSourceData<Integer, String> { 
    @Override 
    public String getData(Integer i) { 
     try { 
     System.out.println("fetch from remote Source2 " + new Timestamp(System.currentTimeMillis())); 
     Thread.sleep(4000L); 
     } catch (InterruptedException ex) { 
     // ignore 
     } 
     return "bar"; 
    } 
    } 
} 

Выход на это показывает, что Source1 и Source2 не выбираются параллельно:

fetch from remote Source0 2016-06-03 16:00:38.802 
fetch from remote Source2 2016-06-03 16:00:40.803 
fetch from remote Source1 2016-06-03 16:00:44.804 
[bar, foo] 

Если удалить часть(), а затем Source1 и Source2 извлекаются параллельно, но требует двух вызовов в Source0:

fetch from remote Source0 2016-06-03 16:05:48.027 
fetch from remote Source0 2016-06-03 16:05:48.027 
fetch from remote Source1 2016-06-03 16:05:50.028 
fetch from remote Source2 2016-06-03 16:05:50.028 
[foo, bar] 

Как я могу получить один вызов Source0 и параллельных вызовов Source1 & source2?

+0

Используйте .cache() вместо .share(). –

ответ

0

Источники не запрашиваются параллельно, потому что они используют тот же поток, который вы спали между операциями из разных источников.

Я добавил некоторые дополнительные протоколирования для отображения выполнения потоков для вас простой пример:

выборки из удаленного Source0 2016-06-03 23: 28: 35,426 Thread = Thread [RxIoScheduler-2,5, главный] fetch from remote Source1 2016-06-03 23: 28: 37.426 Thread = Thread [RxIoScheduler-2,5, main] fetch from remote Source2 2016-06-03 23: 28: 41.426 Thread = Thread [RxIoScheduler-2, 5, main] [foo, bar]

Как вы можете видеть, это тот же поток для всех задач, которые ожидают edly идет спать на 2s и 4s.

Я пробовал различные конфигурации из subscribeOn и observeOn и не могу сказать, что это совершенно прозрачно для меня, почему RxJava делает то, что она делает в некоторых случаях, но вот то, что работает для меня:

public static void main(String[] args) throws Exception { 
    Scheduler scheduler = Schedulers.io(); 
    RemoteSourceData<Integer, Integer> source0 = new RemoteSource0(); 
    RemoteSourceData<Integer, String> source1 = new RemoteSource1(); 
    RemoteSourceData<Integer, String> source2 = new RemoteSource2(); 

    Observable<Integer> i = Observable.just(1).subscribeOn(scheduler); 
    Observable<Integer> i0 = i.map(source0::getData).share(); 
    Observable<String> fooSource = i0.observeOn(scheduler).map(source1::getData); 
    Observable<String> barSource = i0.observeOn(scheduler).map(source2::getData); 

    Observable<List<String>> merged = Observable.merge(fooSource, barSource).toList(); 
    merged.subscribe(System.out::println); 

    Thread.sleep(15000); 
} 

Выход:

fetch from remote Source0 2016-06-03 23: 37: 11.264 Thread = Thread [RxIoScheduler-3,5, main] fetch from remote Source1 2016-06-03 23: 37: 13.265 Thread = Thread [RxIoScheduler -2,5, основной] выборки из удаленного source2 2016-06-03 23: 37: 13,265 резьбы = резьба [RxIoScheduler-4,5, главный] [Foo, бар]

Теперь fooSource и barSource посвятил темы, каждые и работать параллельно ,