2016-01-28 9 views
3

Я хочу синхронно испускать два объекта Observable (которые являются асинхронными), один за другим, где он возвращает , первый Испускаемый объект наблюдения. Если первый из них терпит неудачу, он не должен выделять второй.RxJava; Как испускать наблюдаемые синхронно

Допустим, у нас есть один Observable, который подписывает пользователя в, а другой Observable, который автоматически выбирает учетную запись пользователя, после подписания в

Это то, что я пробовал:.

public Observable<AccessToken> signInAndSelectAccount(String username, String password) 
{ 

    Observable<AccessToken> ob1 = ...; // Sign in. 
    Observable<Account> ob2 = ...; // Select account. 


    return Observable.zip(
      ob1, 
      ob2, 
      new Func2<AccessToken, Account, AccessToken>() { 
       @Override 
       public AccessToken call(AccessToken accessToken, Account account) 
       { 
        return accessToken; 
       } 
      }); 
} 

Это к сожалению, не работает для моего варианта использования. Он будет излучать/вызывать оба наблюдаемых параллельно, начиная с «ob1».

Кто-нибудь сталкивался с аналогичным вариантом использования? Или есть идея о том, как заставить наблюдаемые ждать друг друга синхронно, где может быть возвращено первое излучаемое?

Заранее спасибо.

ответ

5

В реактивном программировании нет такого термина, как «ожидание». Вам нужно подумать о создании потока данных, где один Observable может быть запущен другим. В вашем случае после получения token вам необходимо получить account. Это может выглядеть следующим образом:

Observable<Account> accountObservable = Observable.create(new Observable.OnSubscribe<AccessToken>() { 
    @Override public void call(Subscriber<? super AccessToken> subscriber) { 
     subscriber.onNext(new AccessToken()); 
     subscriber.onCompleted(); 
    } 
}).flatMap(accessToken -> Observable.create(new Observable.OnSubscribe<Account>() { 
    @Override public void call(Subscriber<? super Account> subscriber) { 
     subscriber.onNext(new Account(accessToken)); 
     subscriber.onCompleted(); 
    } 
})); 
1

Я не знаю, Java, но решение в Scala, вероятно, будет это, надеюсь, что это читаемо вам

import rx.lang.scala.Observable 

class AccessToken 
class Account 

case class TokenAndAccount(token: AccessToken, account: Account) 

val accessTokenSource = Observable.just(new AccessToken) 
val accountSource = Observable.just(new Account) 

accessTokenSource 
    .flatMap(token ⇒ accountSource.map(account ⇒ TokenAndAccount(token, account))) 
    .subscribe(tokenAndAccount ⇒ println(tokenAndAccount)) 

В основном flatMap будет убедиться, что accountSource.map... используется только после того, как был выдан токен от accessTokenSource. Внутри accountSource.map мы объединяем полученный токен и учет вместе для последующего использования в subscribe.

flatMap является одним из самых полезных операторов, обязательно прочитайте его документы и, возможно, некоторые учебники.

+0

Привет Tomas, спасибо за это. Я займусь этим! –

3

Вы также можете использовать rx.observables.BlockingObservable например .:

BlockingObservable.from(/**/).single(); 
Смежные вопросы