2015-03-04 2 views
21

Вот изображение того, что я пытаюсь выполнить.Split Rx Наблюдается в нескольких потоках и обрабатывается отдельно

--abca - БББ - это

расщепляются на

--a ----- ------- а -> поток

- --- Ь ------ БББ --- -> б поток

------ с ---------- -> с потоком

Тогда , может быть

a.subscribe() 
b.subscribe() 
c.subscribe() 

До сих пор все, что я нашел, разделил поток с помощью groupBy(), но затем свернул все обратно в один поток и обработал их все в одной и той же функции. То, что я хочу сделать, это обрабатывать каждый производный поток по-другому.

То, как я делаю это прямо сейчас, представляет собой набор фильтров. Есть лучший способ сделать это?

ответ

7

Вам не нужно сворачивать Observables от groupBy. Вы можете подписаться на них.

Что-то вроде этого:

String[] inputs= {"a", "b", "c", "a", "b", "b", "b", "a"}; 

    Action1<String> a = s -> System.out.print("-a-"); 

    Action1<String> b = s -> System.out.print("-b-"); 

    Action1<String> c = s -> System.out.print("-c-"); 

    Observable 
      .from(inputs) 
      .groupBy(s -> s) 
      .subscribe((g) -> { 
       if ("a".equals(g.getKey())) { 
        g.subscribe(a); 
       } 

       if ("b".equals(g.getKey())) { 
        g.subscribe(b); 
       } 

       if ("c".equals(g.getKey())) { 
        g.subscribe(c); 
       } 
      }); 

Если заявления выглядят своего рода некрасиво, но по крайней мере, вы можете обрабатывать каждый поток отдельно. Возможно, есть способ избежать их.

+0

Да, я бы хотел избежать того, если возможно. Однако, если это сработает, то это будет выглядеть немного чище, так как все его в одном месте, а не делать фильтры в исходном потоке. Благодаря! –

+0

Работал как очарование! –

+0

Прохладный! Я уточню свой ответ, если выясню, как избавиться от операторов 'if'. – ihuk

31

Легкий как пирог, просто использовать filter

пример, в Скале

import rx.lang.scala.Observable 

val o: Observable[String] = Observable.just("a", "b", "c", "a", "b", "b", "b", "a") 
val hotO: Observable[String] = o.share 
val aSource: Observable[String] = hotO.filter(x ⇒ x == "a") 
val bSource: Observable[String] = hotO.filter(x ⇒ x == "b") 
val cSource: Observable[String] = hotO.filter(x ⇒ x == "c") 

aSource.subscribe(o ⇒ println("A: " + o), println,() ⇒ println("A Completed")) 

bSource.subscribe(o ⇒ println("B: " + o), println,() ⇒ println("B Completed")) 

cSource.subscribe(o ⇒ println("C: " + o), println,() ⇒ println("C Completed")) 

Вам просто нужно, чтобы убедиться, что источник наблюдаемым жарко. Самый простой способ - это share.

+2

Что делать, если вы хотите, чтобы начальное наблюдение было холодным? –

+7

@double_squeeze просто использовать 'publish' вместо' share' и вызывать 'connect', когда подписываются все подписчики. –

+0

Активировано из-за комментария от @Krzysztof Skyrzynecki – CrazyBS

Смежные вопросы