2016-11-07 3 views
1

I m using rxjava 1.2.2.rxjava получение максимального значения из буфера

Начиная с моего списка, я хочу заполнить буфер, а затем отфильтровать элемент Max в буфере так, чтобы, например, каждые 5 секунд, нужно было испускать фильтр Max Item.

Observable<Item> EventEmitter = Observable.from(itemsList); 

Observable<List<Item>> tapBufferEmitter = tapEventEmitter.buffer(5, TimeUnit.SECONDS); 

MathObservable.from(tapBufferEmitter).max(new Comparator<List<Item>>() { 

      @Override 
      public int compare(List<Item> o1, List<Item> o2) { 
       int m1 =o1.getVal(); 
       int m2 = o1.getVal(); 
       if (m1 == m2){ 
        return 0; 
       } else if (m1 > m2){ 
        return 1; 
       } else { 
        return -1; 
       }       
      } 
     }).subscribeOn(Schedulers.from(executor1)) 
     .subscribe(s -> { 
      System.out.println("Called thread: " + Thread.currentThread().getId()); 

      syso.("Max Item is:" + s.getId()); 
     }, e -> System.out.println(e.getMessage())); 

Но, конечно, код фрагмента выше doen t work. Я не хочу сравнивать 2 списка o1 и o2, но хочу просто сравнить элемент того же списка.

Является ли оператор max правильным выбором? Имейте в виду, что я не сравниваю целые числа, кроме элементов. Каждый элемент представляет собой шарик с фиксированным полем. Я хочу тот, у которого максимальное значение этого поля.

Как выбрать максимум из буфера? Спасибо

+0

ли вам импортируйте libary RxJavaMath, потому что MathObservable не находится в пакете RxJava. –

+0

да, конечно ... но я задаюсь вопросом о правильной конкатенации операторов. – Alex

+0

Я отредактировал вопрос ... вы можете прочитать его снова? – Alex

ответ

1

Я написал пример, как использовать оператор MathObservable.max. Обратите внимание, что я использовал окно вместо буфера, потому что буфер возвращает список, и окно даст мне Observable, который я могу повторно использовать с помощью flatMap и MathObservablen. Затем MathObservable вычислит максимальное значение данного окна (видимое из 5 элементов).

Gradle:

compile 'io.reactivex:rxjava:1.2.1' 
compile 'io.reactivex:rxjava-math:1.0.0' 

Окно:

@Test 
public void windowMaxTest() throws Exception { 
    Observable<Integer> just = Observable.just(10, 9, 8, 4, 7, 5, 6, 8, 4, 3); 

    Observable<Integer> integerObservable1 = just.window(5) 
      .flatMap(integerObservable -> { 
       return MathObservable.max(integerObservable); 
      }); 

    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>(); 

    integerObservable1.subscribe(testSubscriber); 

    testSubscriber.awaitTerminalEvent(); 
    testSubscriber.assertValues(10, 8); 
} 

Буфер:

@Test 
public void bufferMaxTest() throws Exception { 
    Observable<Integer> just = Observable.just(10, 9, 8, 4, 7, 5, 6, 8, 4, 3); 

    Observable<Integer> integerObservable1 = just.buffer(5) 
      .flatMap(integerObservable -> { 
       return MathObservable.max(Observable.from(integerObservable)); 
      }); 

    TestSubscriber<Integer> testSubscriber = new TestSubscriber<>(); 

    integerObservable1.subscribe(testSubscriber); 

    testSubscriber.awaitTerminalEvent(); 
    testSubscriber.assertValues(10, 8); 
} 

пользовательского объекта ::

class Item { 
    public int value; 

    public Item(int value) { 
     this.value = value; 
    } 
} 

@Test 
public void test3214() throws Exception { 
    final Item max1 = new Item(3); 
    final Item max2 = new Item(6); 
    final List<Item> myListOfItem = Arrays.asList(new Item(1), new Item(2), max1, new Item(4), new Item(5), max2); 

    Observable<Item> itemObservable1 = Observable 
      .from(myListOfItem) 
      .buffer(3) 
      .flatMap(itemObservable -> { 
       Observable<Item> from = Observable.from(itemObservable); 

       return MathObservable.from(from) 
         .max((item, t1) -> { 
          return Integer.compare(item.value, t1.value); 
         }); 
      }); 

    TestSubscriber<Item> testSubscriber = new TestSubscriber<>(); 

    itemObservable1.subscribe(testSubscriber); 

    testSubscriber.awaitTerminalEvent(); 

    testSubscriber.assertValues(max1, max2); 
} 
+0

Как я могу интегрировать его с буфером? используя оператор flatmap? – Alex

+0

Я добавил буфер-тест к моему ответу. –

+0

Извините еще раз ... i m совершенно новый с rx, мой ввод - это список , поэтому я не могу использовать справедливого оператора. Cna вы обновляете предыдущий пример, используя стартовый список? – Alex

0

в целом, .reduce() является стандартным выбором для получения мин/макс/ср/сумму и т.д.

reduce(0, /*return Math.max(lhs, rhs)*/) 

так что вся операция будет что-то вроде этого -

source.window(/*...*/).flatMap(/*return .reduce(0, /*return Math.max(lhs, rhs)*/)) 
+0

. .flatMap (/ * return .reduce (0,/* return Math.max (lhs, rhs) * /)) i означает return .reduce .... – Alex

+0

от оригинала Observable вы создаете Наблюдаемый <Наблюдаемый > с .window(), то вы вызываете на него .flatMap() и оператор _inside_ .flatMap(), который вы возвращаете .reduce (0, Math.max()) в аргументе .flatMap() –

+0

, но что, если у моего элемента есть и я должен исправить элемент, у которого есть значение item.getIndex()? – Alex