2015-11-24 3 views
2

У меня есть Observable<String>. Я хотел бы превратить это в Map<String, Int>, который сообщает мне количество вхождений для каждой отдельной строки.ReactiveX: рассчитать частоту отдельных элементов в наблюдаемом

Наблюдаемый содержит ~ 1 миллиард элементов, из которых 1000 различны (поэтому сохранение всего набора данных в ОЗУ не является вариантом). В настоящее время я перебираю по Observable и обновляю HashMap. Я также должен следить за тем же потоком, чтобы избежать условий гонки. Тем не менее, получить частоту элемента следует по своей сути легко распараллелить, поэтому было бы неплохо воспользоваться этим.

Есть ли способ сделать это?

+0

Поскольку источник данных является последовательным и операция обновления O (1), я не уверен, что вы выиграете, пройдя параллельно. – akarnokd

+0

@akarnokd Источник данных рассчитан на несколько потоков и с моим текущим методом, который я должен синхронизировать, что замедляет работу. –

ответ

3

Вы можете использовать groupBy вместо того, чтобы поддерживать HashMap самостоятельно. groupBy создаст Observable для каждого ключа, и вы можете подписаться на него в другом Планировщике. Например,

public class KeyCounter { 
    int key; 
    long count; 

    public KeyCounter(int key, long count) { 
     this.key = key; 
     this.count = count; 
    } 

    @Override 
    public String toString() { 
     return "key: " + key + " count: " + count; 
    } 
} 

@Test 
public void foo() { 
    Observable<Integer> o = Observable.just(1, 2, 3, 2, 1); 
    o.groupBy(i -> i).flatMap(
     group -> 
      group.subscribeOn(Schedulers.computation()).countLong().map(count -> new KeyCounter(group.getKey(), count)) 
    ).subscribe(System.out::println); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 
Смежные вопросы