2015-12-22 2 views
0

Я пытаюсь создать сервер имитации, чтобы упростить запись тестов для клиента, который работает с потоком событий.Как использовать TestScheduler с HTTP-сервером RxNetty

Я реализую сервер и клиент с RxNetty, мой вопрос в том, как я могу использовать TestScheduler с RxNetty для управления, когда происходят события.

Вот моя (упрощенный) Сервер:

final Observable<Event> events = Observable.just(...); 
    final TestScheduler testScheduler = new TestScheduler(); 
    final HttpServer<ByteBuf, ByteBuf> server = RxNetty.createHttpServer(0, (request, response) -> { 

     // [snip] validate request... 

     return request.getContent().flatMap(buf -> 
      events 
       .zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, testScheduler), (e, t) -> e) 
       .map(Encoder::eventToBytes) 
       .flatMap(response::writeBytesAndFlush) 
     ); 
    }); 
    server.start(); 

Однако, когда я указываю, что interval следует использовать testScheduler сервер никогда не посылает никаких событий. Если я удалю testScheduler или Schedulers.computation() или Schedulers.io(), сервер сможет отправлять события.

Вот на стороне клиента/использование, что ожидает получить события, поступающие от сервера:

final int serverPort = server.getServerPort(); 
    final URI uri = URI.create(String.format("http://localhost:%d/api/v1/scheduler", serverPort)); 
    final HttpClient<ByteBuf, ByteBuf> client = RxNetty.<ByteBuf, ByteBuf>newHttpClientBuilder(uri.getHost(), uri.getPort()) 
     .pipelineConfigurator(new HttpClientPipelineConfigurator<>()) 
     .build(); 

    final Event event = new Event("some event") 

    final TestScheduler clientReceiveScheduler = new TestScheduler(); 
    final HttpClientRequest<ByteBuf> request = HttpClientRequest.createPost(uri.getPath()) 
     .withHeader("Content-Type", "application/x-protobuf") 
     .withHeader("Accept", "application/x-protobuf") 
     .withContent(event.toByteArray()); 

    final Observable<byte[]> observable = client.submit(request) 
     .flatMap(AbstractHttpContentHolder::getContent) 
     .map(BufUtils::bufToBytes) 
     .zipWith(Observable.interval(10, TimeUnit.MILLISECONDS, clientReceiveScheduler), (b, l) -> b) 
     .doOnNext((b) -> LOGGER.info("Received bytes: {}", Arrays.toString(b))); 

    final TestSubscriber<Event> sub = new TestSubscriber<>(); 
    observable.subscribe(sub); 

    testScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); 
    clientReceiveScheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); 
    sub.getOnNextEvents() 
     .stream() 
     .forEach(System.out::println); 

Вот ссылка на мой full server и test case пытается использовать сервер.

ответ

0

TestScheduler требует, чтобы вы продвигали время, используя его методы advanceTime* для запуска любых запланированных действий. Observable.interval() планирует действие для предоставленного планировщика, который в этом случае является TestScheduler. Без продвижения времени в TestScheduler он никогда не выполнит действия; галочкой Observable.interval() в этом случае.

Удаление RxNetty, если вы посмотрите на этот код:

TestScheduler scheduler = Schedulers.test(); 
    TestSubscriber<Long> subscriber = new TestSubscriber<>(); 
    Observable.interval(1, TimeUnit.MILLISECONDS, scheduler) 
       .take(1) 
       .subscribe(subscriber); 

    scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); 

    System.out.println(subscriber.getOnNextEvents().size()); 

Выход всегда будет 1. Ото, если вы закомментировать scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); это всегда будет 0.

+0

Спасибо @ user2607715 Я был в состоянии получить материал интервала для работы с планировщиком тестов аналогичным образом, описанным здесь, я даже получил его для работы с RxNetty для получения потока событий. Тем не менее, я не смог заставить его работать с сервером, отправляющим сообщения. Я обновлю свой вопрос, чтобы показать, что я делаю на стороне клиента. –

+0

@BenWhitehead проблема с кодом заключается в том, что между запросом, полученным сервером, и временем выполнения 'testScheduler' существует неотъемлемое условие гонки. Чтобы доказать эту гипотезу, если вы добавили сон перед продвижением планировщика тестов, вы увидите, что получите ответ, как ожидалось. –

+0

Спасибо @Nitesh Кант Я рассмотрю, что я могу сделать, чтобы обойти это. Я неправильно догадался, что на серверные потоки не повлияет testScheduler. –

Смежные вопросы