2015-04-07 2 views
1

Я пытаюсь сделать асинхронные вызовы отдыха, реализуя RxJava. Ниже реализация -Асинхронный вызов с RxJava -Observables

final Observable<List<A>> observableA = Observable.create(new Observable.OnSubscribe<List<A>>() { 
    @Override 
    public void call(Subscriber<? super List<A>> subscriber) { 
     try { 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onNext(//another Function call); 
       subscriber.onCompleted(); 
      } 
     } catch (Exception e) { 
      subscriber.onError(e); 
     } 
    } 
}); 

final Observable<List<B>> observableB = Observable.create(new Observable.OnSubscribe<List<B>>() { 
    @Override 
    public void call(Subscriber<? super List<B>> subscriber) { 
     try { 
      if (!subscriber.isUnsubscribed()) { 
       subscriber.onNext(//another Function call); 
       subscriber.onCompleted(); 
      } 
     } catch (Exception e) { 
      subscriber.onError(e); 
     } 
    } 
}); 


Observable<List<C>> reservationObserv = Observable.zip(observableA, observableB, new Func2< List<A>, List<B> , List<C>>() { 
    @Override 
    public List<C> call(final List<A> a, final List<B> b) { 
     // Merge the response 
     return c; 
    } 
}); 

В настоящее время, ObservableA сначала выполняется, а затем ObservableB. Может ли кто-нибудь предложить, почему вызовы не являются асинхронными.

Заранее спасибо.

, когда я исполняю следующим образом, observableB выполняется первым, а затем ObservableA

final Observable<List<A>> observableA = Observable.create(new Observable.OnSubscribe<List<A>>() { 

@Override 
public void call(final Subscriber<? super List<A>> subscriber) { 
    Runnable run = new Runnable() { 
     @Override 
     public void run() { 
      // Delay of 1000ms 
      subscriber.onNext(//calling a method); 
      subscriber.onCompleted(); 
     } 
    }; 
    executorService.execute(run); 
} 
}); 

final Observable<List<B>> observableB = Observable.create(new Observable.OnSubscribe<List<B>>() { 

@Override 
public void call(final Subscriber<? super List<B>> subscriber) { 
    Runnable run = new Runnable() { 
     @Override 
     public void run() { 
      // No delay 
      subscriber.onNext(//calling a method); 
      subscriber.onCompleted(); 
     } 
    }; 
    executorService.execute(run); 
} 
}); 


Observable<List<C>> observableC = Observable.zip(observableA, observableB, new Func2< List<A>, List<B> , List<C>>() { 
@Override 
public List<C> call(final List<A> a, final List<B> b) { 
    // Merge the response 
    return c; 
} 
}); 
+0

Когда вы говорите: «ObservableA сначала выполняется, а затем ObservableB», что вы подразумеваете под этим? Что запускает весь процесс? –

+0

Спасибо @Robert Harvey за форматирование кода. Поскольку RxJava помогает выполнять параллельным образом и zip ответ в конце, я попытался реализовать таким же образом. Но здесь звонки производятся синхронно. Я что-то упускаю? – user1879835

+1

Посмотрите на плановиков. По умолчанию RxJava является однопоточным. –

ответ

2

По умолчанию RxJava синхронно. Поэтому в вашем первом случае оператор zip будет подписаться на observableA, затем, когда observableA заполнит, подписаться на observableB.

Во втором случае, когда вы используете службу-исполнитель, вы деактиционно асинхронны.

Чтобы быть асинхронным с вашей первой версией, так как она зашла в комментарии, вы должны посмотреть на Schedulers и сообщить RxJava, в которой schdeulers должна выполнять ваша подписка.

Observable<List<C>> observableC = Observable.zip(
        observableA.subscribeOn(Schedulers.io()), 
        observableB.subscribeOn(Schedulers.io()), 
        (a, b) -> /** ... **/); 
observableC.subscribe(); 

Вы можете использовать разные планировщики, в зависимости от того, чего вы хотите достичь. (используя Schedulers.io() для ввода/вывода, ...)

Смежные вопросы