2015-02-18 4 views
2

В соответствии с документацией в реализации Java отсутствует withLatestFrom (которая отличается от combineLatest). Любая идея о том, как подражать этому? enter image description hereRxJava: как эмулировать withLatestFrom?

+1

Возможно, дубликат http://stackoverflow.com/questions/27203435/combinelatest-emit-only-when-one-of-the-streams-changes –

ответ

2

Учитывая a как основные наблюдаемые и b как «последнюю из» наблюдаемых, это псевдо-java8 лямбда должна делать то, что вы хотите:

a.publish(a' -> b.switchMap(y -> a'.map(x -> x + y))) 

Это первое публикует a в a', что позволяет ему быть подписанный повторно, без перезапуска потока. Затем каждый раз, когда выдается новый элемент из b, он повторно подписывается на текущий поток a, который объединяет в себе последний результат b с каждым выходом a.

Вы можете легко обернуть, что в качестве реализации RxJava-х Transformer, как это (также полу-псевдо, так что проверить мой синтаксис):

public class WithLatestFrom<T, U, V> implements Transformer<T, V> { 
    private final Func2<T, U, V> function; 
    private final Observable<U> latest; 

    private WithLatestFrom<T, U, V>(final Observable<U> latest, Func2<T, U, V> function) { 
    this.function = function; 
    this.latest = latest; 
    } 

    public static <T, U, V> WithLatestFrom<T, U, V> with(
     final Observable<U> latest, Func2<T, U, V> function) { 
    return new WithLatestFrom<T, U, V>(latest, function); 
    } 

    @Override 
    public Observable<V> call(final Observable<T> source) { 
    return source.publish((publishedSource) -> latest.switchMap((y) -> 
     publishedSource.map((x) -> function.call(x, y))); 
    } 

} 

Тогда вы можете повторно использовать его в коде, как:

a.compose(WithLatestFrom.with(b, (x, y) -> x + y)); 
+0

Кроме того, я только выяснил, что 1.0.7 имеет экспериментальный оператор withLatestFrom: https://github.com/ReactiveX/RxJava/releases/tag/v1.0.7 :) – lopar

+0

Yup, интересное решение в любом случае! Благодаря! –

0

Некоторые очень простая и наивная реализация:

@SuppressWarnings("unchecked") 
public static <T, U, V> Observable<T> combineLatestFrom(
    Observable<U> o1, 
    Observable<V> o2, 
    Func2<U, V, T> f) { 

    final Object nothing = new Object(); 
    return Observable.create(s -> { 

     AtomicReference<V> val2 = new AtomicReference<V>((V) nothing); 

     o1.subscribe(v -> { 
      val2.getAndUpdate(current -> { 
      if (current != nothing) { 
       s.onNext(f.call(v, current)); 
      } 
      return current; 
      }); 
     }, s::onError, s::onCompleted); 

     o2.subscribe(val2::set, s::onError); 

    }); 
} 

И этот метод может быть использован, как это:

combineLatestFrom(numbers, letters, (n, l) -> n + l) 
    .subscribe(System.out::println); 

Если здесь цифры и буквы Наблюдаемых из мраморной диаграммы - > результат будет таким, как ожидалось.

+0

Какова цель 'val1'? Ваш пример может быть упрощен, так как 'val1' кажется бесполезным? –

+0

И вы абсолютно правы - обновляя его! – meddle

Смежные вопросы