Я создаю сервер, который использует команды из многочисленных источников, таких как JMS, SNMP, HTTP и т. Д. Все они асинхронны и работают нормально. Сервер поддерживает одно подключение к одному элементу устаревшего оборудования, которое имеет архитектуру «запрос/ответ» с пользовательским протоколом TCP. В идеале я хотел бы одну команду, как этот метод типа блокировкиСинтетический клиент Netty с асинхронными вызывающими абонентами
public Response issueCommandToLegacyHardware(Command command)
или этого асинхронного метода типа
public Future<Response> issueCommandToLegacyHardware(Command command)
Я сравнительно новым для Нетти и асинхронного программирования, в основном изучать его, как я иду вперед. Моя нынешняя мысль заключается в том, что мой класс LegacyHardwareClient
будет иметь public synchronized issueCommandToLegacyHardware(Command command)
, сделает запись на клиентский канал устаревшим аппаратным обеспечением, затем take()
с SynchronousQueue<Response>
, который будет блокироваться. ChannelInboundHandler в трубопроводе будет offer()
a Response
до SynchronousQueue>Response>
, который позволит take()
разблокировать и принять данные.
Это слишком запутанное? Есть ли примеры вокруг синхронных реализаций клиента Netty, на которые я могу смотреть? Есть ли лучшие методы для Netty? Я мог бы использовать только стандартные сокеты Java, однако способность Netty для анализа пользовательских протоколов наряду с простотой maintanability слишком велика, чтобы отказаться.
UPDATE: Что касается реализации, я использовал ArrayBlockingQueue <>(), и я использовал put() и remove(), а не offer() и remove(). Поскольку я хотел убедиться, что последующие запросы к устаревшему оборудованию были отправлены только тогда, когда на какие-либо активные запросы были отправлены ответы, поскольку устаревшее поведение аппаратного обеспечения неизвестно в противном случае.
Причина, по которой предложение() и remove() не работает для меня, заключалось в том, что команда offer() не пропускает ничего, если не было активно блокировки take() не запрашивать другую сторону. Обратное верно, что remove() ничего не вернет, если не будет блокировки ввода put(), вставляющей данные. Я не мог использовать put()/remove(), поскольку оператор remove() никогда не был достигнут, поскольку на канал не было запрошено письмо, чтобы вызвать событие, из которого будет вызываться remove(). Я не мог использовать предложение()/take(), поскольку оператор offer() вернет false, поскольку вызов take() еще не был выполнен. Используя ArrayBlockingQueue <>() с емкостью 1, он обеспечил выполнение только одной команды одновременно. Любые другие команды блокируются до тех пор, пока не будет достаточно места для вставки, с емкостью 1 это означает, что он должен быть пустым. Опорожнение очереди выполнялось после получения ответа от устаревшего оборудования. Это обеспечило хорошее синхронное поведение в отношении устаревшего оборудования, но предоставило асинхронный API пользователям устаревшего оборудования, для которого их много.
Не работает для меня. Как вы создаете объект 'Future'? Я попытался вернуть результат из вызова 'channel.write (command)', но это дает «Будущее ' (Использование Netty 4.1) –
@WimDeblauwe В коде вы должны вернуть обещание, которое было передано в аргументах (as обещание - это пример будущего) (только что понял, что в коде отсутствует код) – Ferrybig
Если вы используете 'remove()' вместо 'take()' вы не получите исключение NullPointerException, если элемент отсутствует? –