2016-04-23 3 views
7

У меня есть следующая проблема:RxJava разделяет выбросы наблюдаемых в период между несколькими абонентами

У меня есть наблюдаемый, что делает какую-то работу, но и другие наблюдаемыми нужен вывод, что наблюдаемый на работу. Я попытался несколько раз подписаться на одно и то же наблюдаемое, но внутри журнала я вижу, что исходный наблюдаемый инициируется несколько раз.

вот мои Наблюдаемые то будет создать объект:

Observable.create((Observable.OnSubscribe<Api>) subscriber -> { 
      if (mApi == null) { 
       //do some work 
      } 
      subscriber.onNext(mApi); 
      subscriber.unsubscribe(); 
     }) 

тот мой наблюдаемым, который нуждается объект

loadApi().flatMap(api -> api....())); 

Я использую

.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread()) 
       .unsubscribeOn(Schedulers.io() 

на всех наблюдаемых.

ответ

12

Я не уверен, что правильно понял ваш вопрос, но считаю, что вы ищете способ поделиться выбросами наблюдаемого между несколькими подписчиками. Есть несколько способов сделать это. С одной стороны, вы могли бы использовать Connectable Observable так:

ConnectableObservable<Integer> obs = Observable.range(1,3).publish(); 
obs.subscribe(item -> System.out.println("Sub A: " + item)); 
obs.subscribe(item -> System.out.println("Sub B: " + item)); 
obs.connect(); //Now the source observable starts emitting items 

Выход:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

В качестве альтернативы можно использовать PublishSubject:

PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject 
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject 
subject.subscribe(item -> System.out.println("Sub B: " + item));  
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable 

Выход:

Sub A: 1 
Sub B: 1 
Sub A: 2 
Sub B: 2 
Sub A: 3 
Sub B: 3 

Оба Эти примеры являются однопоточными, но вы можете легко добавлять вызовы registerOn или subscirbeOn, чтобы сделать их асинхронными.

+0

да я хочу, первый результат с наблюдаемым на каждом подписываться – H3x0n

+0

, что я должен делать, когда я не знаю, как часто мои наблюдаемые будут подписаны, и когда абонент должен получить результат, прежде чем все будут подписаны. – H3x0n

+0

в значительной степени ответил на [этот вопрос] (http://stackoverflow.com/questions/35951942/single-observable-with-multiple-subscribers) – JohnWowUs

1

Прежде всего использование Observable.create сложно и легко ошибиться. Вам нужно что-то вроде

Observable.create(subscriber -> { 
     if (mApi == null) { 
      //do some work 
     } 
     if (!subscriber.isUnsubscribed()) { 
      subscriber.onNext(mApi); 
      subscriber.onCompleted(); 
      // Not subscriber.unsubscribe(); 
     }    
}) 

Вы можете использовать

ConnectableObservable<Integer> obs = Observable.just(1).replay(1).autoConnect(); 

Всех последующих абонент должны получить сингл, испускаемого товара

obs.subscribe(item -> System.out.println("Sub 1 " + item)); 
obs.subscribe(item -> System.out.println("Sub 2 " + item)); 
obs.subscribe(item -> System.out.println("Sub 3 " + item)); 
obs.subscribe(item -> System.out.println("Sub 4 " + item)); 
+0

Я пробовал ваш, но на подписку не вызывается. Мое наблюдаемое является публичным статическим финалом. – H3x0n

+0

Вы пробовали свой оригинал, наблюдаемый с помощью .publish(). Replay(). Autoconnect()? – JohnWowUs

+0

когда я использую только .share() он работает. – H3x0n