2017-01-18 6 views
0

У меня есть данные, которые непрерывно испускаются с аппаратного устройства. Эти данные должны быть сначала отправлены в webservice A, который возвращает поток результатов через некоторое время, если поступит достаточное количество данных. Затем каждый результат должен быть перенаправлен, как только он поступит в webservice B, который, в свою очередь, возвращает поток разных результатов через некоторое время, если придут достаточные результаты из A. Каждый веб-сервис имеет асинхронный API обратного вызова. Также необходима настройка соединения, прежде чем данные будут отправлены в первый раз на каждый веб-сервис.RxJava: сеть асинхронных веб-сервисов обратного вызова

Как я могу сопоставить это с RxJava?

ответ

2

flatMap и concatMap являются основными инструментами для асинхронной цепочки.

Вам необходимо обернуть ваши веб-службы в Future. И ваше аппаратное устройство в источник Observable. Тогда же просто, как:

class WebServices { 
    Future<Response1> callService1(parameters) { ... } 
    Future<Response2> callService2(parameters) { ... } 
} 

hardwareSource 
    .flatMap(v -> Observable.fromFuture(callService1(...))) 
    .flatMap(r1 -> Observable.fromFuture(callService2(...))) 
    .subscribe(r2 -> System.out.println(r2)); 

В случае, если веб-сервисы получать и отправлять серию сообщений, которые они должны быть завернуты в Observable с. И обработка трубопровода будет выглядеть так:

class WebServices { 
    Observable<Response1> sendToService1(parameters) { ... } 
    Observable<Response2> sendToService2(parameters) { ... } 
} 

hardwareSource 
    .flatMap(v -> sendToService1(...)) 
    .flatMap(r1 -> sendToService2(...)) 
    .subscribe(r2 -> System.out.println(r2)); 

И в случае входящих и исходящих потоков для веб-сервисов не строго коррелируют (ответы, непосредственно не связанных с запросами), то я бы осуществлять эти услуги как классы, разоблачающих как Observer и Observable интерфейсы ,

// wire them up 
hardwareSource.getObservable() 
    .subscribe(webService1.getObserver()); 
webService1.getObservable() 
    .subscribe(webService2.getObserver()); 
webService2.getObservable() 
    .subscribe(resultHandler); 

// initiate connections 
webService2.connect() 
webService1.connect() 
hardwareSource.connect() 
+0

Спасибо за ваш ответ. Проблема с «Будущим » заключается в том, что нет единого ответа ждать. Это поток ответов (0..n), и количество ответов неизвестно заранее. Кроме того, результаты должны быть отправлены сразу после их поступления, поэтому даже использование «Будущего <Список >» не помогло бы. – aha

+0

@aha OK. Он выглядит еще более реактивным. Вам необходимо реализовать 'Observable' для каждого веб-сервиса, а затем' flatMap' их напрямую. Как вы получаете несколько ответов от одной и той же службы? Вы делаете несколько запросов HTTP? –

+0

Нет прямой корреляции между количеством запросов, которые я делаю, и количеством полученных ответов. Одна служба обменивается данными через WebSockets, другая использует gRPC, что означает, что я должен заранее настроить каналы связи, а затем передать поток. – aha

Смежные вопросы