2015-03-10 4 views
4

У меня возникли трудности с использованием API Spring Reactor Stream (аналогично rxjava) для создания объекта ответа в моих ответах на обслуживание, предоставляемых двумя службами нисходящего потока.Терминальные вызовы к потоку никогда не выполняются

Ниже приведен метод accept() на моем канале. Некоторые имена были изменены, чтобы защитить невинные ..

 
@Override 
public void accept(final NetChannel channel) { 
    log.info("Consuming NetChannel of FullHttpRequest"); 
    try { 
     // Our initial Stream is a single HTTP request containing our XML document. 
     Stream requestStream = channel.in(); 

     requestStream.filter((x) -> {return true;}) 
       .dispatchOn(dispatcher) 
       .map(new FooRequestFunction())     // 1) 
       .flatMap(new BarRequestStreamFunction())   // 2) 
       .flatMap(new DownstreamRequestZipFunction())  // 3) 
       .toList()           // 4) 
       .onComplete(new ResponsesConsumer(channel));  // 5) 

    } catch (Exception e) { 
     log.error("Exception thrown during Channel processing", e); 
    } 
} 

Итак, FooRequest обручей много BarRequests, каждый из которых имеет один запрос, связанных Классифицировать и один связанный запрос Validate. Мы хотим: 1) конвертировать в FooRequest, 2) конвертировать FooRequest в серию BarRequests, 3) запускать два нисходящих запроса для каждого BarRequest, 4) объединить все наши объекты BarResponse в общий отклик, 5) отправить ответ назад для клиента.

Точка, с которой я сталкиваюсь с проблемами, это метод toList(), который никогда не выполняется. Каждый раз, когда я пытался что-то, что связано с Promise, он всегда кажется сломанным, и это не стало исключением.

FooRequestFunction, BarRequestStreamFunction довольно просты и, кажется, работают нормально. Их подпись методы являются:

// FooRequestFunction 
public FooRequest apply(final FullHttpRequest request); 

И:

// BarRequestStreamFunction 
public Stream<BarRequest> apply(FooRequest dsoRequests); 

DownstreamRequestZipFunction выглядит следующим образом:

 
@Override 
public Stream apply(BarRequest t) { 
    Stream classifyRes = Streams 
      .just(t) 
      .flatMap(new ClassifyDownstreamRequestFunction()); 

    Stream validateRes = Streams 
      .just(t) 
      .flatMap(new ValidateDownstreamRequestFunction()); 

    return Streams.zip(classifyRes, validateRes, (tuple) -> { 
     BarResponse response = new BarResponse(); 
     response.setClassifyRes(tuple.getT1()); 
     response.setValidateRes(tuple.getT2()); 
     return response; 
    }); 
} 

Это, кажется, работает хорошо, до тех пор, пока оба нисходящей функция запроса возвращает результат.

Наконец, потребитель в конце сцепленных вызовов имеет эту подпись:

// ResponsesConsumer 
public void accept(Promise<List<BarResponse>> responses) 

Что это будет ждать() ответы обещают, а затем агрегирует все эти ответы в единый XML-документ, написанный назад к каналу. Я могу сказать, что выполнение никогда не доходит до этого метода, потому что ни один из журналов не срабатывает. Кажется, все останавливается на .toList().

Кто-нибудь знает, почему эта настройка когда-либо кажется выполненной toList() или что-нибудь после этого?

EDIT: Хорошо, у меня есть немного больше информации. После предоставления соглашения об именах каждому потоку приложения, чтобы облегчить отладку, я вижу, что «shared-1», поток, который запускает метод accept(), входит в состояние WAITING, а затем остается там. Это может быть связано с тем, что базовый диспетчер является диспетчером ringbuffer, который является однопоточным.

Я изменил код так, чтобы подход был несколько иным, и использовал многопоточный диспетчер, и избегал использования Promise, но у меня все еще есть состояние, в котором хвост связанного набора вызовов не будет выполняться. Смотрите ниже:

 
@Override 
public void accept(final NetChannel channel) { 
    log.info("Consuming NetChannel of FullHttpRequest"); 
    try { 
     // Our initial Stream is a single HTTP request containing our XML document. 
     Stream requestStream = channel.in(); 

     requestStream.filter((x) -> {return true;}) 
       .dispatchOn(dispatcher) 
       .map(new FooRequestFunction())     // 1) 
       .flatMap(new BarRequestStreamFunction())   // 2) 
       .flatMap(new DownstreamRequestZipFunction())  // 3) 
       .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;})          // 4) 
       .consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("Complete");}, dispatcher);  // 5) 

    } catch (Exception e) { 
     log.error("Exception thrown during Channel processing", e); 
    } 
} 

В выше, я заменил ToList() с призывом уменьшить() и разрушилась все в одном List<BarResponse>. Я вижу, что это исполнение и ведение журнала просто прекрасны.Однако, независимо от того, что я делаю с последним вызовом, после tryying(), consumeOn() и т. Д. - он никогда не выполняется и никогда не регистрирует окончательные вызовы, которые вы видите выше.

В VisualVM я вижу, что потоки диспетчера все ждут на одном объектном мониторе, связанном с блокирующей очередью - другими словами, они все ждут работы. Это подобно вызову tail consumeOn(), который полностью игнорируется.

Что я здесь делаю неправильно? Что я не понимаю?

EDIT 2: Учитывая, что Джонс ответил ниже, я подозреваю, что проблема связана с настройкой сервера. Возможно только для реактора выпуск 2.0.0.M2, который настраивается в главном Application класса следующим образом:

 
@Bean 
    public NetServer httpServer(
      final Environment env, 
      final MetricRegistry metrics, 
      final ChannelConsumer consumer) throws InterruptedException { 

     NetServer server = new TcpServerSpec(
       NettyTcpServer.class) 
       .env(env) 
       .options(
         new NettyServerSocketOptions() 
          .pipelineConfigurer((ChannelPipeline pipeline) -> pipeline 
           .addLast(new HttpServerCodec()) 
           .addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH)))) 
       .consume(consumer) 
       .get(); 

     server.start().await(); 

     return server; 
    } 

Нет диспетчер не настроен для этого, и это, кажется, используя дезинтегратор Lmax под капотом, а не a NettyEventLoopDispatcher. Непонятно, как настроить NettyEventLoopDispatcher и использовать его в качестве диспетчера замены.

+0

есть достаточный объем диспетчера? – harshtuna

+0

Да. Я тестирую его с помощью одного запроса Http. Он зависает, как только он попадает в toList() – Jon

+0

Просто быстрый комментарий к вышесказанному - похоже, что это какая-то проблема с Netty. Мы пробовали эквивалентную программу с использованием RxJava/RxNetty и имели очень схожую проблему. Кроме того, мы не смогли больше узнать, так как у нас не хватило времени для выполнения этой задачи. – Jon

ответ

0

Звонок consumeOn() избыточен, потому что вы уже на нем Dispatcher (если только ваш «настоящий» код не использует этот пример). В любом случае, когда вы пишете выход, он будет переключаться на NettyEventLoopDispatcher.

Я сделал быструю проверку, чтобы убедиться, что этот поток работы за пределами TcpServer и это работает, как ожидалось:

@Test 
public void testStream() throws InterruptedException { 
    Stream<String> s1 = Streams.just("Hello World!"); 

    s1.filter(s -> s.startsWith("Hello")) 
     .dispatchOn(Environment.sharedDispatcher()) 
     .map(s -> s.toUpperCase()) 
     .flatMap(s -> Streams.just(s, s)) 
     .flatMap(s -> Streams.just(s, s)) 
     .reduce(new ArrayList<>(), (l, s) -> { 
      l.add(s); 
      return l; 
     }) 
     .consume(l -> { 
      System.out.println("thread: " + Thread.currentThread() + ", l: " + l); 
     }); 

    Thread.sleep(500); 
} 

Было бы интересно узнать, если поток работает на потоке Нетти просто закомментировать вызов .dispatchOn().

+0

Пробовал это раньше. Случается, что поток, помеченный как «shared-2», заканчивает регистрацию всего до потребителя, затем переходит в состояние WAITING, а потребитель не выполняет. Дамп дат показывает, что он ждет на мониторе объекта для очереди. Из трассировки стека на дампе потока это похоже на использование LMAX-подрывника под капотом. Это насколько я могу получить. – Jon

+0

Модификация consumeOn() для использования() не имеет никакого эффекта, я уже пробовал это тоже. – Jon

Смежные вопросы