2015-07-01 2 views
0

У меня есть 2 Subject (plusOne [itemId], минусOne [itemId]), которые испускают идентификаторы предметов, сумма которых должна быть изменена на одну (добавленную или вычитаемую).Сумма обновления с использованием Rx

Пользователь может отправлять несколько сигналов с одинаковым идентификатором элемента. В конце концов, я хочу подписаться на Observable, которые испускают Pair[itemId,amountToSet]

UI выглядит как список элементов строк, каждая из которых содержит «+» и «-» кнопки, которые вызывают соответствующую onNext (Itemid).

Я пробовал думать о решении проблемы с помощью группировки предметов и сокращения последовательности, но проблема в том, что reduce вызывается только тогда, когда вызывается onCompleted, в то время как мои субъекты не должны заканчивать их выбросы.

Вот код, который не может использовать уменьшить:

plusItem.asObservable().groupBy(id -> id).subscribe(new Action1<GroupedObservable<Long, Long>>() { 
    @Override 
    public void call(GroupedObservable<Long, Long> groupedObservable) { 
     System.out.println("Composed a group with key: " + groupedObservable.getKey()); 
     groupedObservable 
       .map(id -> 1) 
       .startWith(0) 
       .reduce((integer, integer2) -> integer + integer2) 
       .subscribe(new Action1<Integer>() { 
        @Override 
        public void call(Integer sum) { 
         System.out.println(Integer.toString(sum)); 
        } 
       }); 
    } 
}); 

Решение

private Observable<Pair<Long,Integer>> getUpdateObservable(
    Observable clickSource, 
     Observable<Map<Long,Integer>> initValues, 
     Func1<Long,Integer> groupMapFunc 

) { 
    return clickSource 
      .asObservable() 
      .groupBy(id -> id) 
      .flatMap(new Func1<GroupedObservable<Long, Long>, Observable<Pair<Long, Integer>>>() { 
       @Override 
       public Observable<Pair<Long, Integer>> call(GroupedObservable<Long, Long> groupedObservable) { 
        return groupedObservable 
          .map(groupMapFunc) 
          .scan((integer, integer2) -> integer + integer2) 
          .withLatestFrom(initValues, new Func2<Integer, Map<Long, Integer>, Integer>() { 
           @Override 
           public Integer call(Integer integer, Map<Long, Integer> basket) { 
            return basket.get(groupedObservable.getKey()) + integer; 
           } 
          }) 
          .map(new Func1<Integer, Pair<Long, Integer>>() { 
           @Override 
           public Pair<Long, Integer> call(Integer integer) { 
            return new Pair<>(groupedObservable.getKey(), integer); 
           } 
          }); 
       } 
      }); 
} 

Можете ли вы предложить что-нибудь?

Спасибо!

+1

rxjs является тег для реактивных расширений для JavaScript – paulpdaniels

+0

@paulpdaniels, так что? Я не против получать предложения от сообщества javascript – midnight

ответ

1

Вы можете использовать scan будет возвращать промежуточные значения

plusItem 
    .asObservable() 
    .groupBy(id -> id) 
    .subscribe(new Action1<GroupedObservable<Long, Long>>() { 
    @Override 
    public void call(GroupedObservable<Long, Long> groupedObservable) { 
     System.out.println("Composed a group with key: " + groupedObservable.getKey()); 
     groupedObservable 
       .map(id -> 1) 
       .scan(0, (integer, integer2) -> integer + integer2) 
       .subscribe(new Action1<Integer>() { 
        @Override 
        public void call(Integer sum) { 
         System.out.println(Integer.toString(sum)); 
        } 
       }); 
    } 
}); 
Смежные вопросы