Я пытаюсь создать сервер имитации, чтобы упростить запись тестов для клиента, который работает с потоком событий.Как использовать TestScheduler с HTTP-сервером RxNetty
Я реализую сервер и клиент с RxNetty, мой вопрос в том, как я могу использовать TestScheduler с RxNetty для управления, когда происходят события.
Вот моя (упрощенный) Сервер:
final Observable<Event> events = Observable.just(...);
final TestScheduler testScheduler = new TestScheduler();
final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, (request, response) -> {
// [snip] validate request...
return request.getContent().flatMap(buf ->
events
.zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, testScheduler), (e, t) -> e)
.map(Encoder::eventToBytes)
.flatMap(response::writeBytesAndFlush)
);
});
server.start();
Однако, когда я указываю, что interval
следует использовать testScheduler
сервер никогда не посылает никаких событий. Если я удалю testScheduler
или Schedulers.computation()
или Schedulers.io()
, сервер сможет отправлять события.
Вот на стороне клиента/использование, что ожидает получить события, поступающие от сервера:
final int serverPort = server.getServerPort();
final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort));
final HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort())
.pipelineConfigurator(new HttpClientPipelineConfigurator<>())
.build();
final Event event = new Event("some event")
final TestScheduler clientReceiveScheduler = new TestScheduler();
final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath())
.withHeader("Content-Type", "application/x-protobuf")
.withHeader("Accept", "application/x-protobuf")
.withContent(event.toByteArray());
final Observable<byte[]> observable = client.submit(request)
.flatMap(AbstractHttpContentHolder::getContent)
.map(BufUtils::bufToBytes)
.zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, clientReceiveScheduler), (b, l) -> b)
.doOnNext((b) -> LOGGER.info("Received bytes: {}", Arrays.toString(b)));
final TestSubscriber<Event> sub = new TestSubscriber<>();
observable.subscribe(sub);
testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
clientReceiveScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
sub.getOnNextEvents()
.stream()
.forEach(System.out::println);
Вот ссылка на мой full server и test case пытается использовать сервер.
Спасибо @ user2607715 Я был в состоянии получить материал интервала для работы с планировщиком тестов аналогичным образом, описанным здесь, я даже получил его для работы с RxNetty для получения потока событий. Тем не менее, я не смог заставить его работать с сервером, отправляющим сообщения. Я обновлю свой вопрос, чтобы показать, что я делаю на стороне клиента. –
@BenWhitehead проблема с кодом заключается в том, что между запросом, полученным сервером, и временем выполнения 'testScheduler' существует неотъемлемое условие гонки. Чтобы доказать эту гипотезу, если вы добавили сон перед продвижением планировщика тестов, вы увидите, что получите ответ, как ожидалось. –
Спасибо @Nitesh Кант Я рассмотрю, что я могу сделать, чтобы обойти это. Я неправильно догадался, что на серверные потоки не повлияет testScheduler. –