2015-08-13 2 views
3

Я вареные мою проблему вниз, в следующем фрагменте кода:RxJava GroupBy и последующее блокирование операций (OnComplete не хватает?)

Observable<Integer> numbers = Observable.just(1, 2, 3); 

Observable<GroupedObservable<Integer,Integer>> outer = numbers.groupBy(i->i%3); 
System.out.println(outer.count().toBlocking().single()); 

который блокирует нескончаемо. Я был readingseveralposts и считаю, что я понимаю проблему: GroupedObservables не будет вызывать onComplete, пока их внутренние Observables также не будут завершены. К сожалению, хотя я до сих пор не могу получить вышеприведенный фрагмент для печати!

Например, следующее:

Observable<Integer> just = Observable.just(1, 2, 3); 

Observable<GroupedObservable<Integer,Integer>> groupBy = just.groupBy(i->i%3); 
groupBy.subscribe(inner -> inner.ignoreElements()); 
System.out.println(groupBy.count().toBlocking().single()); 

еще ничего не делает. Я неправильно понял проблему? Есть ли еще одна проблема? Короче говоря, как я могу использовать вышеприведенные фрагменты?

Большое спасибо заранее, Дэн.

ответ

1

Да, вы должны потреблять группы определенным образом. Второй пример не работает, потому что у вас есть две независимые подписки на операцию группировки.

Обычно решение равно flatMap, но не с ignoreElements, потому что это только что завершится, и count не получит никаких элементов. Вместо этого вы можете использовать takeLast(1):

Observable.just(1, 2, 3) 
    .groupBy(k -> k % 3) 
    .flatMap(g -> g.takeLast(1)) 
    .count() 
    .toBlocking() 
    .forEach(System.out::println); 
+0

Привет, спасибо за ответ. К сожалению, у меня все еще есть проблемы, когда я применяю это к своему делу. Что делать, если я хочу считать элементы трех групп? По сути, я хочу что-то вроде: 1: 1, 2: 1, 3: 1. – Dan

+0

или, скорее, 0: 1, 1: 1, 2: 1! – Dan

Смежные вопросы