2016-09-15 2 views
2

У меня есть список повторяющихся элементов, скажут:Rx эквивалент COUNT с GROUP BY?

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 

Я хотел бы сгруппировать их по их стоимости вместе с сколько раз они появляются, так что результат будет парой:

{"A", 3}, {"B", 1}, {"C", 2} 

основном эквивалент оператора SQL, как SELECT x, COUNT(1) GROUP BY x;

я только так далеко, чтобы назвать GroupBy на них:

source.groupBy(x -> x, x -> 1) 

Но это трансформирует поток в GroupedObservables, и я не мог найти хороший пример того, как двигаться дальше с ними. Я пробовал reduce(), но это не очень хорошо, так как после groupBy() он хочет уменьшить GroupedObservables, а не элементы внутри каждой группы.

Возможно ли это с помощью GroupedObservables? Возможно ли любой другой способ достичь желаемого результата?

+1

Понимаете ли вы, что если у вас был горячий наблюдаемый или тот, у которого нет сигнала «OnComplete», что вы не можете выполнить это вычисление? – Enigmativity

+0

Нет, я об этом не думал, но, похоже, это понятно, спасибо. На самом деле, у меня есть список, выпущенный как целое, а не элемент по элементу в моем случае (я просто упростил код для вопроса), поэтому, я думаю, это не проблема, верно? –

+0

Да, но если это холодный наблюдаемый, который испускает все его значения, вам может и не понадобиться Rx. Для чего вы пытаетесь это сделать? – Enigmativity

ответ

8

Следующий код:

source.groupBy(val -> val) 
    .flatMap(
     gr -> gr.count() 
       .map(count -> new Pair<>(gr.getKey(), count) 
    ) 
).subscribe(System.out::println); 

напечатать бы из:

A=3 
B=1 
C=2 
+0

Действительно элегантный, спасибо! –

1
Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 
    Observable<KeyValue<String, Integer>> countStream = source 
      .groupBy(val -> val) 
      .flatMap(obs -> obs.count().flatMap(cnt -> Observable.just(new KeyValue<>(obs.getKey(), cnt)))); 

    private static class KeyValue<K, V> { 

    private final K key; 
    private final V val; 

    public KeyValue(K key, V val) { 
     this.key = key; 
     this.val = val; 
    } 

    public K getKey() { 
     return key; 
    } 

    public V getVal() { 
     return val; 
    } 
} 
1

Другой способ заключается в использовании collect или collectInto метод, как показано ниже.

Observable<String> source = Observable.just("A", "B", "A", "C", "C", "A"); 

source.collectInto(new HashMap<String, MutableInt>(), (map, elem) -> { 
    if (map.containsKey(elem)) { 
    map.get(elem).increment(); 
    } else { 
    map.put(elem, new MutableInt(1)); 
    } 

}).subscribe(System.out::println); 

Кстати, если бы мы использовали Reactor, collect это наводит на мысль способ сделать это, потому что в случае большого числа групп, flatMap после GroupBy будет висеть.

От javadoc of Reactor

Обратите внимание, что группаЯ лучше всего работает с низкой мощностью групп, поэтому выбрал вашу функцию keyMapper соответственно.

Примечательно, что если критерии создают большое количество групп, это может привести к зависанию, если группы не будут потребляться надлежащим образом по течению (например, из-за плоской карты с параметром maxConcurrency, который установлен слишком низким).

Существует также github issue, связанный с этим.