2016-08-26 3 views
2

Допустим, у нас есть источник Observable из Ints:Как произвести наблюдаемую из другого

val source:Observable[Int] 

Я хотел бы создать еще Observable, производя значения, разность в первой появилось значение в источника больше 10 :

def detect() = Observable[Int](
    subscriber => 
    if (!subscriber.isUnsubscribed) { 
     var start:Option[Int] = None 
     source.subscribe(
     item => { 
      if (start.isEmpty) { 
      start = Option(item) 
      } 
      else { 
      start.filter(v => Math.abs(item - v) > 10).foreach { 
       item => subscriber.onNext(item) 
      } 
      } 
     } 
    ) 
     subscriber.onCompleted() 
    } 
) 

Здесь я использовал вар начать провести первое значение из источника наблюдаемых.

Есть ли способ упростить этот код? Мне не нравится этот подход с присвоением значения var

ответ

3

Вот что я придумал:

import rx.lang.scala.Observable 

val source = Observable.from(List(5, 2, 3, 16, -40, 2, -70, 50)) 

source.scan(Option.empty[(Int, Int)]) { (acc, next) => 
    acc.map(_.copy(_2 = next)) orElse Some((next, next)) 
}.collect { 
    case Some((start, current)) if math.abs(start - current) > 10 => current 
}.subscribe(x => println(x)) 

гравюр

16 
-40 
-70 
50 

В основном сканирование хранит аккумулятор, который может быть неинициализирован (None), или может удерживать пару: первое значение и последний элемент, испускаемый из источника. Затем мы собираем только те элементы, которые соответствуют вашему предикату.

0

Вам просто нужно применить оператор filter, который создает новое наблюдаемое, которое отражает выбросы наблюдаемого источника, но пропускает те, для которых тесты предикатов ложны:

val filtered = source.filter(v => Math.abs(item - v) > 10) 
+0

Как перенести первое значение источника? Я сравниваю каждое следующее значение источника с первым значением – Nyavro

+0

А, я неправильно прочитал ваш вопрос; другой ответ кажется правильным. –

Смежные вопросы