2016-01-19 4 views
2

Я создаю сервер, который использует команды из многочисленных источников, таких как JMS, SNMP, HTTP и т. Д. Все они асинхронны и работают нормально. Сервер поддерживает одно подключение к одному элементу устаревшего оборудования, которое имеет архитектуру «запрос/ответ» с пользовательским протоколом TCP. В идеале я хотел бы одну команду, как этот метод типа блокировкиСинтетический клиент Netty с асинхронными вызывающими абонентами

public Response issueCommandToLegacyHardware(Command command) 

или этого асинхронного метода типа

public Future<Response> issueCommandToLegacyHardware(Command command) 

Я сравнительно новым для Нетти и асинхронного программирования, в основном изучать его, как я иду вперед. Моя нынешняя мысль заключается в том, что мой класс LegacyHardwareClient будет иметь public synchronized issueCommandToLegacyHardware(Command command), сделает запись на клиентский канал устаревшим аппаратным обеспечением, затем take() с SynchronousQueue<Response>, который будет блокироваться. ChannelInboundHandler в трубопроводе будет offer() a Response до SynchronousQueue>Response>, который позволит take() разблокировать и принять данные.

Это слишком запутанное? Есть ли примеры вокруг синхронных реализаций клиента Netty, на которые я могу смотреть? Есть ли лучшие методы для Netty? Я мог бы использовать только стандартные сокеты Java, однако способность Netty для анализа пользовательских протоколов наряду с простотой maintanability слишком велика, чтобы отказаться.

UPDATE: Что касается реализации, я использовал ArrayBlockingQueue <>(), и я использовал put() и remove(), а не offer() и remove(). Поскольку я хотел убедиться, что последующие запросы к устаревшему оборудованию были отправлены только тогда, когда на какие-либо активные запросы были отправлены ответы, поскольку устаревшее поведение аппаратного обеспечения неизвестно в противном случае.

Причина, по которой предложение() и remove() не работает для меня, заключалось в том, что команда offer() не пропускает ничего, если не было активно блокировки take() не запрашивать другую сторону. Обратное верно, что remove() ничего не вернет, если не будет блокировки ввода put(), вставляющей данные. Я не мог использовать put()/remove(), поскольку оператор remove() никогда не был достигнут, поскольку на канал не было запрошено письмо, чтобы вызвать событие, из которого будет вызываться remove(). Я не мог использовать предложение()/take(), поскольку оператор offer() вернет false, поскольку вызов take() еще не был выполнен. Используя ArrayBlockingQueue <>() с емкостью 1, он обеспечил выполнение только одной команды одновременно. Любые другие команды блокируются до тех пор, пока не будет достаточно места для вставки, с емкостью 1 это означает, что он должен быть пустым. Опорожнение очереди выполнялось после получения ответа от устаревшего оборудования. Это обеспечило хорошее синхронное поведение в отношении устаревшего оборудования, но предоставило асинхронный API пользователям устаревшего оборудования, для которого их много.

ответ

4

Вместо разработки приложения на блокирующим образом с использованием SynchronousQueue<Response>, его дизайн в неблокируемом способом с использованием SynchronousQueue<Promise<Response>>.

Ваш public Future<Response> issueCommandToLegacyHardware(Command command) должен затем использовать offer() добавить DefaultPromise<>() в очереди, а затем Нетти трубопровод может использовать remove(), чтобы получить ответ на этот запрос, обратите внимание, я использовал remove() вместо take(), так как только в исключительных случаях, есть ни один элемент не присутствует.

Быстрая реализация этого может быть:

public class MyLastHandler extends SimpleInboundHandler<Response> { 
    private final SynchronousQueue<Promise<Response>> queue; 

    public MyLastHandler (SynchronousQueue<Promise<Response>> queue) { 
     super(); 
     this.queue = queue; 
    } 

    // The following is called messageReceived(ChannelHandlerContext, Response) in 5.0. 
    @Override 
    public void channelRead0(ChannelHandlerContext ctx, Response msg) { 
     this.queue.remove().setSuccss(msg); // Or setFailure(Throwable) 
    } 
} 

выше обработчик должен быть помещен последним в цепочке.

Реализация public Future<Response> issueCommandToLegacyHardware(Command command) может выглядеть:

Channel channel = ....; 
SynchronousQueue<Promise<Response>> queue = ....; 

public Future<Response> issueCommandToLegacyHardware(Command command) { 
    return issueCommandToLegacyHardware(command, channel.eventLoop().newPromise()); 
} 

public Future<Response> issueCommandToLegacyHardware(Command command, Promise<Response> promise) { 
    queue.offer(promise); 
    channel.write(command); 
    return promise; 
} 

Использование подхода с перегрузкой на issueCommandToLegacyHardware также шаблон проектирования используется для Channel.write, это делает его очень flexable.

Этого дизайн-шаблон может использоваться следующим образом в коде клиента:

issueCommandToLegacyHardware(
    Command.TAKE_OVER_THE_WORLD_WITH_FIRE, 
    channel.eventLoop().newPromise() 
).addListener(
    (Future<Response> f) -> { 
     System.out.println("We have taken over the world: " + f.get()); 
    } 
); 

Преимущества этого шаблона является то, что не ненужной блокировка не используется в любом месте, просто логик асинхронного.

Приложение I: Javadoc:

PromiseFutureDefaultPromise

+0

Не работает для меня. Как вы создаете объект 'Future '? Я попытался вернуть результат из вызова 'channel.write (command)', но это дает «Будущее ' (Использование Netty 4.1) –

+0

@WimDeblauwe В коде вы должны вернуть обещание, которое было передано в аргументах (as обещание - это пример будущего) (только что понял, что в коде отсутствует код) – Ferrybig

+0

Если вы используете 'remove()' вместо 'take()' вы не получите исключение NullPointerException, если элемент отсутствует? –

Смежные вопросы