flatMap
и concatMap
являются основными инструментами для асинхронной цепочки.
Вам необходимо обернуть ваши веб-службы в Future
. И ваше аппаратное устройство в источник Observable
. Тогда же просто, как:
class WebServices {
Future<Response1> callService1(parameters) { ... }
Future<Response2> callService2(parameters) { ... }
}
hardwareSource
.flatMap(v -> Observable.fromFuture(callService1(...)))
.flatMap(r1 -> Observable.fromFuture(callService2(...)))
.subscribe(r2 -> System.out.println(r2));
В случае, если веб-сервисы получать и отправлять серию сообщений, которые они должны быть завернуты в Observable
с. И обработка трубопровода будет выглядеть так:
class WebServices {
Observable<Response1> sendToService1(parameters) { ... }
Observable<Response2> sendToService2(parameters) { ... }
}
hardwareSource
.flatMap(v -> sendToService1(...))
.flatMap(r1 -> sendToService2(...))
.subscribe(r2 -> System.out.println(r2));
И в случае входящих и исходящих потоков для веб-сервисов не строго коррелируют (ответы, непосредственно не связанных с запросами), то я бы осуществлять эти услуги как классы, разоблачающих как Observer
и Observable
интерфейсы ,
// wire them up
hardwareSource.getObservable()
.subscribe(webService1.getObserver());
webService1.getObservable()
.subscribe(webService2.getObserver());
webService2.getObservable()
.subscribe(resultHandler);
// initiate connections
webService2.connect()
webService1.connect()
hardwareSource.connect()
Спасибо за ваш ответ. Проблема с «Будущим» заключается в том, что нет единого ответа ждать. Это поток ответов (0..n), и количество ответов неизвестно заранее. Кроме того, результаты должны быть отправлены сразу после их поступления, поэтому даже использование «Будущего <Список >» не помогло бы. –
aha
@aha OK. Он выглядит еще более реактивным. Вам необходимо реализовать 'Observable' для каждого веб-сервиса, а затем' flatMap' их напрямую. Как вы получаете несколько ответов от одной и той же службы? Вы делаете несколько запросов HTTP? –
Нет прямой корреляции между количеством запросов, которые я делаю, и количеством полученных ответов. Одна служба обменивается данными через WebSockets, другая использует gRPC, что означает, что я должен заранее настроить каналы связи, а затем передать поток. – aha