Я пытаюсь закрепить на наблюдаемые с помощью Vert.x и RxJava. Я не знаю, не понимаю ли я что-то, или это просто какая-то ошибка. Вот код.Vert.x - RxJava - zip observables
public class BusVerticle extends Verticle {
public void start() {
final RxVertx rxVertx = new RxVertx(vertx);
Observable<RxMessage<JsonObject>> bus = rxVertx.eventBus().registerHandler("busName");
Observable<RxHttpClientResponse> httpResponse = bus.mapMany(new Func1<RxMessage<JsonObject>, Observable<RxHttpClientResponse>>() {
public Observable<RxHttpClientResponse> call(RxMessage<JsonObject> rxMessage) {
RxHttpClient rxHttpClient = rxVertx.createHttpClient();
rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
return rxHttpClient.getNow("/uri");
}
});
Observable<RxMessage<JsonObject>> zipObservable = Observable.zip(bus, httpResponse, new Func2<RxMessage<JsonObject>, RxHttpClientResponse, RxMessage<JsonObject>>() {
public RxMessage<JsonObject> call(RxMessage<JsonObject> rxMessage, RxHttpClientResponse rxHttpClientResponse) {
return rxMessage;
}
});
zipObservable.subscribe(new Action1<RxMessage<JsonObject>>() {
public void call(RxMessage<JsonObject> rxMessage) {
rxMessage.reply();
}
});
}
}
Я хочу, чтобы сделать запрос HTTP, используя информацию из принятого сообщения, а затем пронестись как наблюдаемые, автобус событий и ответ HTTP, чтобы ответить на сообщение с информацией из ответа HTTP.
Я не получаю ответа на сообщение, в котором я его отправляю.
Заранее благодарен!