2016-12-14 3 views
2

У меня есть большое количество наблюдаемых и хочу объединить их в одно значение. Они обновлены с высокой частотой и считают, что необходимо использовать «потерю» стратегии противодавления. Я выбрал sample, но все, что угодно, было бы приемлемым. Я хотел бы знать, что самый эффективный способ (производительность и/или ресурс) делает это. Два подхода очевидны:Эффективная выборка большого числа наблюдаемых rx вместе

val list : Iterable[Observable[Double]] = ??? 
val f : (Double,Double) => Double = ??? 
(Observable combineLatest list) sample (1 second) reduce f 

или альтернативно

combineLatest (list map (_ sample (1 second))) reduce f 

Я думаю, что справедливо сказать, что это очень распространенный вариант использования. Чтобы сделать это менее абстрактным, представьте, что у меня много датчиков температуры, которые непрерывно излучают данные. Это были бы элементы моего list. Они производят температуру в единицах СИ, и я хочу рассчитать среднюю температуру (это f). Я также хочу иметь выбор, чтобы показать результат в градусах Цельсия или Фаренгейта.

  1. Какой из двух подходов выше (любые другие предложения приветствуются, хотя) было бы более эффективным пространством/временем?
  2. Скажем, я хотел отображать не только среднюю температуру, но и индивидуальную (возможно, преобразованную) температуру. Это влияет на ответ?
  3. Будет ли ваш ответ изменяться, если Observables поступает из разных источников (например, наблюдается в отдельных потоках), независимо от того, пришел ли он в один сериализованный поток (например, результат оператора groupBy по одному наблюдаемому).
  4. Принимая во внимание то, что я говорю выше о противодавлении, будет ли ваш ответ меняться, если это требование было отброшено (т. Е. Нет необходимости повторной выборки).
+0

Вы когда-нибудь находили решение? –

+0

Нет; Я перешел от RX. Я думаю, что для этого требуется специализированная структура данных, несмотря на то, что она является общей моделью. Дело не в том, что это сложно реализовать (как я уже сказал выше), но жесткий бит делает это с помощью монадического/коллекционного интерфейса. Например, посмотрите https://github.com/OpenHFT/Chronicle-Map. – Luciano

ответ

0

Я думаю, что as может быть полезен здесь. В моем случае у меня не просто куча наблюдаемых; У меня есть куча вещей, которые имеют наблюдаемых. Поэтому сначала я создал вспомогательный класс, чтобы связать каждую вещь с ее наблюдаемым:

public class Pair<K, V>{ 
    private final K mKey; 
    private final Observable<V> mValues; 

    public Pair(final K key, final Observable<V> values) { 
     mKey = key; 
     mValues = values; 
    } 
    public K key() { 
     return mKey; 
    } 

    public Observable<V> values() { 
     return mValues; 
    } 
} 

Затем я создал ObservableConverter, преобразующий Observable<Pair<K,V>> в Observable<Map<K,V>>. Он будет периодически испускает карту, содержащую последнюю V от каждого K:

public class MapLatest<K, V> implements ObservableConverter<Pair<K, V>, Observable<Map<K, V>>> { 
    private final PublishSubject<Map<K,V>> mPub = PublishSubject.create(); 
    private final Map<K, V> mMap = new ConcurrentHashMap<>(); 

    @Override 
    public Observable<Map<K, V>> apply(final Observable<Pair<K, V>> pairs) { 
     pairs.subscribe(this::subscribe); 
     return mPub; 
    } 

    private void subscribe(final Pair<K,V> pair) { 
     final K k = pair.key(); 
     pair.values().doOnNext(v -> mMap.put(k, v)).doOnTerminate(() -> mMap.remove(k)); 
    } 
} 

Моих «вещи» игроки, и я хочу, чтобы периодически строить R-дерево своих мест. Так что я пара игроков с их наблюдаемым местоположением, а затем применить MapLatest:

players.map(player -> new Pair<>(player, player.location())) 
    .as(new MapLatest<>()).subscribe(...); 

Обратите внимание, что я не показал какой-либо код в MapLatest, что на самом деле излучает ничего. Я рассматриваю различные способы, чтобы сделать это:

  • Construct MapLatest с Observable<?> и испускают всякий раз, когда что наблюдаемые излучает.
  • Построить MapLatest с Predicate<V>, который возвращает true, когда он должен испускать.
  • Дает MapLatest a emit метод и вызывают выбросы извне.

Также обратите внимание, что Observable<Map<K,V>> не прекращается. Для моих целей это намеренно.

Смежные вопросы