2015-09-22 6 views
2

Предположим, у нас есть совместная игра, в которой игроки получают и теряют очки. Любой игрок может присоединиться к игре в любой момент. После присоединения игрок никогда не уходит (для простоты). Общий балл команды - это просто сумма очков каждого игрока в любой момент времени. Игрок должен уметь видеть свой текущий счет и общий балл команды.Сумма последних значений среди групп

Итак, у нас есть поток десятков всех игроков, привязанных к отметкам времени.

Буквы обозначают игроков, цифры - баллы. Значения, такие как A4B4, означают, что очки очков двух игроков изменились одновременно. После groupBy у нас есть три потока: каждый для каждого игрока. Обратите внимание, что набор игроков не предопределен.

Time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 

Alex: 5 4   3 4 9     8  12 
Ben:    2  4      6  10 
Katie:         5   9 

Что я ожидаю получить это поток сумм для каждого изменения баллов:

Time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 
Total: 5 4  6 5 8 13   18 20 19 27 31 

Это тривиально. Но решение становится сложным.

Это входной поток.

Observable<Record> input = Observable.from(new Record[] { 
    new Record("Alex", 0, 5), 
    new Record("Alex", 1, 4), 
    new Record("Ben", 5, 2), 
    new Record("Alex", 6, 3), 
    new Record("Alex", 7, 4), 
    new Record("Ben", 7, 4), 
    new Record("Alex", 8, 9), 
    new Record("Katie", 12, 5), 
    new Record("Ben", 14, 6), 
    new Record("Alex", 15, 8), 
    new Record("Katie", 16, 9), 
    new Record("Ben", 16, 10), 
    new Record("Alex", 17, 12) 
}) 
.delay(e -> Observable.interval(e.timestamp, TimeUnit.SECONDS, scheduler)) 
.doOnCompleted(latch::countDown) 
.share(); 

Запись представляет собой простой кортеж: Record(String player, int timestamp, double score).

Давайте делить его на группы:

Observable< Observable<Record> > output = input 
    .groupBy(e -> e.player) 
    .map(g -> g.cache()) 

Стандартный toList метод ожидает Наблюдаемые для завершения. Но поток групп завершается только тогда, когда входной поток завершается (любой игрок может присоединиться к игре в любое время). Поэтому требуется новый трансформатор. Он возвращает список по вызову onNext вместо onComplete.

.compose(new ToListOnNextTransformer()) 

А потом мы генерируем новый Observable каждый раз, когда новая группа излучается (каждый раз, когда новый игрок присоединяется к игре). Этот Observable возвращает сумму последнего балла всех игроков.

.map(eachPlayerRecordsList -> Observable.combineLatest(eachPlayerRecordsList, (Object... eachPlayerRecordsArray) -> { 
    double total = Arrays 
     .stream(eachPlayerRecordsArray) 
     .mapToDouble(e -> ((Record)e).score) 
     .sum(); 
    int timestamp = Arrays 
     .stream(eachPlayerRecordsArray) 
     .mapToInt(e -> ((Record)e).timestamp) 
     .max() 
     .getAsInt(); 
    return new Record("Total", timestamp, total); 
})); 

Наконец, мы выводим значения из первой группы, пока второй игрок не присоединяется к игре. Затем мы выводим значения из второй группы до тех пор, пока третий игрок не соединится и т. Д. Это именно то, что делает switchOnNext.

Observable.switchOnNext(outputg) 
    .subscribe(System.out::println, e -> { ((Throwable)e).printStackTrace(); }); 

Пользовательский класс трансформатора:

private static class ToListOnNextTransformer implements Observable.Transformer< Observable<Record>, List< Observable<Record> > > { 
    private final List< Observable<Record> > list = new ArrayList<>(); 
    private final PublishSubject< List< Observable<Record> > > ret = PublishSubject.create(); 

    @Override 
    public Observable< List< Observable<Record> > > call(Observable< Observable<Record> > eachPlayerRecords) { 
    eachPlayerRecords.subscribe(playerRecords -> { 
     list.add(playerRecords); 
     ret.onNext(new ArrayList<>(list)); 
    }); 
    return ret; 
    } 
} 

Вопросов:

  • Можно ли упростить? Желательно, с реактивным подходом.
  • Разве я не нарушаю некоторые правила RxJava? Как использование абонента внутри абонента и т. Д.

Это искусственная задача. Я просто изучаю RxJava и пытаюсь понять реактивное программирование. Я надеюсь, что существует лучший способ, и я просто пропущу что-то очевидное.

The full source code of a test file.

ответ

2

Я не полностью понимаю ваш пример кода, но вы могли бы сделать обкатку сумму через scan:

source.scan(0, (a, b) -> a + b) 

Поэтому, учитывая поток событий лиц и их точек, вы можете создать две группы:

ConnectableObservable<Record> co = input.publish(); // replay, cache, etc. 

Observable<Pair<String, Integer>> userScores = co 
.groupBy(r -> r.name) 
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b)))); 

Observable<Pair<String, Integer>> teamScores = co 
.groupBy(r -> r.team) 
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b)))); 

userScores.filter(p -> p.name.equals("Alex")) 
.concatMap(p -> p.second.map(s -> Pair.of(p.first, s))) 
.subscribe(System.out::println); 

co.connect(); 
+0

Благодарим за ответ. К сожалению, бегущая сумма - это не то, что я имел в виду. Согласно приведенному выше примеру, у Алекса 9 очков на отметке времени 8. И Бен имеет 4 очка в тот же момент. Итак, общий балл их команды - 13. На отметке 12, Кэти присоединяется к игре с 5 очками. Счет Alex и Ben еще не изменился. Таким образом, общий балл становится равным 18. На отметке времени 14 оценка Бен составляет 6 (было 4). Общий балл составляет 20. – Qualtagh

+0

С другой стороны: отметка времени 12 баллов (9, 4, 5), ts 14 - (9, 6, 5), ts 15 - (8, 6, 5). Итак, общие баллы представляют собой суммы этих значений: ts 12 - total = 18, ts 14 - total = 20, ts 15 - total = 19. – Qualtagh

Смежные вопросы