У меня возникли трудности с использованием API Spring Reactor Stream (аналогично rxjava) для создания объекта ответа в моих ответах на обслуживание, предоставляемых двумя службами нисходящего потока.Терминальные вызовы к потоку никогда не выполняются
Ниже приведен метод accept()
на моем канале. Некоторые имена были изменены, чтобы защитить невинные ..
@Override public void accept(final NetChannel channel) { log.info("Consuming NetChannel of FullHttpRequest"); try { // Our initial Stream is a single HTTP request containing our XML document. Stream requestStream = channel.in(); requestStream.filter((x) -> {return true;}) .dispatchOn(dispatcher) .map(new FooRequestFunction()) // 1) .flatMap(new BarRequestStreamFunction()) // 2) .flatMap(new DownstreamRequestZipFunction()) // 3) .toList() // 4) .onComplete(new ResponsesConsumer(channel)); // 5) } catch (Exception e) { log.error("Exception thrown during Channel processing", e); } }
Итак, FooRequest
обручей много BarRequests
, каждый из которых имеет один запрос, связанных Классифицировать и один связанный запрос Validate. Мы хотим: 1) конвертировать в FooRequest
, 2) конвертировать FooRequest
в серию BarRequests
, 3) запускать два нисходящих запроса для каждого BarRequest
, 4) объединить все наши объекты BarResponse
в общий отклик, 5) отправить ответ назад для клиента.
Точка, с которой я сталкиваюсь с проблемами, это метод toList()
, который никогда не выполняется. Каждый раз, когда я пытался что-то, что связано с Promise
, он всегда кажется сломанным, и это не стало исключением.
FooRequestFunction
, BarRequestStreamFunction
довольно просты и, кажется, работают нормально. Их подпись методы являются:
// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);
И:
// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);
DownstreamRequestZipFunction
выглядит следующим образом:
@Override public Stream apply(BarRequest t) { Stream classifyRes = Streams .just(t) .flatMap(new ClassifyDownstreamRequestFunction()); Stream validateRes = Streams .just(t) .flatMap(new ValidateDownstreamRequestFunction()); return Streams.zip(classifyRes, validateRes, (tuple) -> { BarResponse response = new BarResponse(); response.setClassifyRes(tuple.getT1()); response.setValidateRes(tuple.getT2()); return response; }); }
Это, кажется, работает хорошо, до тех пор, пока оба нисходящей функция запроса возвращает результат.
Наконец, потребитель в конце сцепленных вызовов имеет эту подпись:
// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)
Что это будет ждать() ответы обещают, а затем агрегирует все эти ответы в единый XML-документ, написанный назад к каналу. Я могу сказать, что выполнение никогда не доходит до этого метода, потому что ни один из журналов не срабатывает. Кажется, все останавливается на .toList().
Кто-нибудь знает, почему эта настройка когда-либо кажется выполненной toList()
или что-нибудь после этого?
EDIT: Хорошо, у меня есть немного больше информации. После предоставления соглашения об именах каждому потоку приложения, чтобы облегчить отладку, я вижу, что «shared-1», поток, который запускает метод accept(), входит в состояние WAITING, а затем остается там. Это может быть связано с тем, что базовый диспетчер является диспетчером ringbuffer, который является однопоточным.
Я изменил код так, чтобы подход был несколько иным, и использовал многопоточный диспетчер, и избегал использования Promise
, но у меня все еще есть состояние, в котором хвост связанного набора вызовов не будет выполняться. Смотрите ниже:
@Override public void accept(final NetChannel channel) { log.info("Consuming NetChannel of FullHttpRequest"); try { // Our initial Stream is a single HTTP request containing our XML document. Stream requestStream = channel.in(); requestStream.filter((x) -> {return true;}) .dispatchOn(dispatcher) .map(new FooRequestFunction()) // 1) .flatMap(new BarRequestStreamFunction()) // 2) .flatMap(new DownstreamRequestZipFunction()) // 3) .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); return list;}) // 4) .consumeOn((x)->{log.info("Consume");}, (x)->{log.error("error");}, (x)->{log.info("Complete");}, dispatcher); // 5) } catch (Exception e) { log.error("Exception thrown during Channel processing", e); } }
В выше, я заменил ToList() с призывом уменьшить() и разрушилась все в одном List<BarResponse>
. Я вижу, что это исполнение и ведение журнала просто прекрасны.Однако, независимо от того, что я делаю с последним вызовом, после tryying(), consumeOn() и т. Д. - он никогда не выполняется и никогда не регистрирует окончательные вызовы, которые вы видите выше.
В VisualVM я вижу, что потоки диспетчера все ждут на одном объектном мониторе, связанном с блокирующей очередью - другими словами, они все ждут работы. Это подобно вызову tail consumeOn(), который полностью игнорируется.
Что я здесь делаю неправильно? Что я не понимаю?
EDIT 2: Учитывая, что Джонс ответил ниже, я подозреваю, что проблема связана с настройкой сервера. Возможно только для реактора выпуск 2.0.0.M2, который настраивается в главном Application
класса следующим образом:
@Bean public NetServer httpServer( final Environment env, final MetricRegistry metrics, final ChannelConsumer consumer) throws InterruptedException { NetServer server = new TcpServerSpec( NettyTcpServer.class) .env(env) .options( new NettyServerSocketOptions() .pipelineConfigurer((ChannelPipeline pipeline) -> pipeline .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH)))) .consume(consumer) .get(); server.start().await(); return server; }
Нет диспетчер не настроен для этого, и это, кажется, используя дезинтегратор Lmax под капотом, а не a NettyEventLoopDispatcher
. Непонятно, как настроить NettyEventLoopDispatcher
и использовать его в качестве диспетчера замены.
есть достаточный объем диспетчера? – harshtuna
Да. Я тестирую его с помощью одного запроса Http. Он зависает, как только он попадает в toList() – Jon
Просто быстрый комментарий к вышесказанному - похоже, что это какая-то проблема с Netty. Мы пробовали эквивалентную программу с использованием RxJava/RxNetty и имели очень схожую проблему. Кроме того, мы не смогли больше узнать, так как у нас не хватило времени для выполнения этой задачи. – Jon