Во-первых, я объясню ситуацию и логику, которую я пытаюсь реализовать:Нетти клиент несколько запросов
У меня есть несколько потоков, каждый положил результат его работы, какой-то объект называется
Result
в очередьQueueToSend
NettyClient
Мои пробегов в потоке и принимаетResult
отQueueToSend
каждый 1 milisecond и должны подключаться к серверу и отправить сообщение, которое создается изResult
. Мне также нужно, чтобы эти соединения были асинхронными. Поэтому мне нужен списокResult
, который должен быть известенNettyHandler
, чтобы отправить правильное сообщение и обработать правый результат, а затем снова отправить ответ.
Так что я инициализировать NettyClient
самозагрузки
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
и устанавливает трубопровод один раз при запуске приложения. Затем каждый milisecond я беру Result
объект из QueueToSend
и подключиться к серверу
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
Я решил использовать статический ConcurrentHashMap
для сохранения каждого результата объекта, взятый из QueueToSend
, ассоциированного с каналом.
Первая проблема имеет место в NettyHandler
в методе channelConnected, когда я пытаюсь принять Result
объект, связанный с каналом от ResultConcurrentHashMap
.
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel channel = ctx.getPipeline.getChannel();
Result result = ResultConcurrentHashMap.get(channel.getId());
}
Но иногда result
равно нулю (1 из 50), даже подумал, что это должно быть в ResultConcurrentHashMap
. Я думаю, что это произойдет, причину того, что channelConnected
события происходит до NettyClient
пробегов этого кода:
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
Может быть, это не будет выглядеть, если я бег NettyServer
и NettyClient
не на локальном и другое, но удалено, он будет принимать moretime к estabilish соединения , Но мне нужно решение для этой проблемы.
Другая проблема заключается в том, что я отправляю сообщения каждые 1 миллисекунду асинхронно, и я полагаю, что сообщения могут быть смешанными, и сервер не может их правильно прочитать. Если я запускаю их один за другим, это будет нормально:
future.getChannel().getCloseFuture().awaitUninterruptibly();
Но мне нужно asynchromus посылать и обрабатывать правильные результаты, ассоциированные с каналом и отправки ответов. Что мне следует реализовать?
Мой NettyClient принимает объект результата из очереди каждого 1 миллисекунды, поэтому результат изменение объекта в Нетти клиента каждый 1, мс, если соединение Cannel появится в 2 миллисекундах, у меня будет другой результат объект ассоциированного с этим каналом? Или нет? – BraginiNI
Я предполагаю, что вам нужно обмениваться данными между потоками (производителем результатов, Netty client) с использованием блокирующей очереди, а не хэш-карты. После вызова bootstrap.connect() создайте BlockingQueue как локальный канал. обработчик может искать очередь, когда клиент готов отправить и опросить очередь на каждые 1 мс. от производителя результатов, просто предложите данные. Никакие данные не будут потеряны из-за задержки запуска нетто-клиента. –
Спасибо за ваш повтор. Но в моем случае это не сработает. У меня есть большой объем продукта-получателя и вам нужно поддерживать асинхронные каналы, отправляя сообщения как можно больше. Теперь я на 90% готов сказать, что смешение кадров - проблема сервера. Ваш первый ответ помог решить первую проблему, поэтому я принимаю ваш ответ. благодаря – BraginiNI