2016-06-28 3 views
3

Учитывая следующий код:CombineLatest и холодный наблюдаемые

 Observable<String> obs1 = Observable.just("1", "2", "3"); 
     Observable<String> obs2 = Observable.just("a", "b"); 
     Observable.combineLatest(obs1, 
           obs2, 
           (s1, s2) -> s1 + ":" + s2 
     ).subscribe(System.out::println); 

Я ожидал бы, чтобы напечатать что-то вроде следующего:

1:a 
2:a 
3:a 
3:b 

Но он печатает

3:a 
3:b 

Почему мой first Observable only, испускающий последний элемент? Активных подписок до combLatest нет, поэтому должно быть холодно. Как я могу убедиться, что все предметы из обоих наблюдаемых объединены вместе?

ответ

3

Это происходит потому, что у вас есть синхронные источники. Когда combineLatest подписывается на свои источники, он по умолчанию берет 128 элементов по умолчанию. Это означает, что ваш первый источник будет работать до завершения синхронно, а вторая подписка не произойдет до завершения. Поскольку первый источник один, все, кроме самого последнего элемента, удаляются. Как только второй источник подписывается, он найдет самое последнее из первого и объединится с ним.

Что вы пытались достичь? Если вы хотите попарно, используйте zip. Если вы хотите, чтобы все комбинации первого и второго, использовать flatMap:

obs1.flatMap(v -> obs2.map(w -> v + ":" + w)).subscribe(...) 
+0

Ну на самом деле мой код работает, потому что мой источник разве синхронный, я наткнулся на это при попытке записи испытание для этого.
То, что я пытаюсь достичь, является следующим:

    * запрос Mongodb, чтобы получить Observable с потоком существующих идентификаторов. * запрос AWS s3, чтобы получить Observable с потоком имен файлов * для каждого файла s3, проверьте, существует ли имя файла в mongo. Если нет, удалите файл из s3.
grub

+0

@akarnokd: с вашей плоской матрицей 'obs2' будет подписаться только после того, как будет выбрано первое значение' obs1'. С 'combLatest' подписка последовательна. Есть ли способ подписывать 'obs2' сразу после подписки' obs1'? – dwursteisen

0

Спасибо за ваш ответ, akarnokd. Предварительная выборка из 128 пунктов - очень полезная информация. Я решить мою проблему, переключая два наблюдаемые, так

Observable<String> obs1 = Observable.just("1", "2", "3", "4"); 
Observable<String> obs2 = Observable.just("4", "b", "1"); 
Observable.combineLatest(obs1, 
         obs2.toList(), 
         (s1, list) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1) 
      .filter(StringUtils::isNotEmpty) 
      .subscribe(System.out::println); 

теперь работает следующим образом:

Observable<String> obs1 = Observable.just("1", "2", "3", "4"); 
Observable<String> obs2 = Observable.just("4", "b", "1"); 
Observable.combineLatest(obs2.toList(), 
         obs1, 
         (list, s1) -> list.stream().anyMatch((listItem) -> s1.contains(listItem)) ? null : s1) 
      .filter(StringUtils::isNotEmpty) 
      .subscribe(System.out::println); 
Смежные вопросы