2015-03-29 3 views
1

Я пишу сервер NIO2, и мне нужно выполнить асинхронные операции чтения в AsynchronousSocketChannel, каждая из этих операций состоит из чтения целого числа и дальнейшего чтения из того же номера канала байтов, равного этому целому числу , Проблема в том, что когда я помещаю два или более компонента CompletionHandler на канал в строке (потому что есть запросы для нескольких операций чтения), и первый из этих обработчиков запускается, мой дальнейший код чтения в методе для обработчика первого обработчика работает должным образом, потому что второй обработчик запускается мгновенно, когда есть информация о канале. Как я могу заблокировать канал до следующего чтения complete() завершает без Future вещь? Я не могу использовать будущую причину Мне нужно поставить обработчик в сокет, а затем перейти к другим задачам.Обработчики параллельных обработок Java NIO2

for (final Map.Entry<String, AsynchronousSocketChannel> entry : ipSocketTable.entrySet()) { 
     try { 
      final AsynchronousSocketChannel outSocket = entry.getValue(); 
      synchronized (outSocket) { 
       final ByteBuffer buf = ByteBuffer.allocateDirect(9); 
       outSocket.read(buf, null, new DataServerResponseHandler(buf, outSocket, resultTable, server, entry.getKey())); 
      } 

     } catch (Exception e) { 

     } 
    } 

Вот DataServerResponseHandler класс:

class DataServerResponseHandler implements CompletionHandler<Integer, Void> { 

    private ConcurrentHashMap<String, Boolean> resultTable = null; 
    private AsynchronousSocketChannel channel = null; 
    private TcpServer server; 
    private String ip; 
    private ByteBuffer msg; 

    public DataServerResponseHandler(ByteBuffer msg, AsynchronousSocketChannel channel, 
      ConcurrentHashMap<String, Boolean> resultTable, TcpServer server, String ip) { 
     this.msg = msg; 
     this.channel = channel; 
     this.resultTable = resultTable; 
     this.server = server; 
     this.ip = ip; 
    } 

    @Override 
    public void completed(Integer result, Void attachment) { 
      try { 
       msg.rewind(); 
       int resultCode = msg.get() & 0xff; 
       int ipOne = msg.get() & 0xff; 
       int ipTwo = msg.get() & 0xff; 
       int ipThree = msg.get() & 0xff; 
       int ipFour = msg.get() & 0xff; 
       int length = msg.getInt(); 
       msg.rewind(); 
       ByteBuffer buf = ByteBuffer.allocateDirect(length); 
       channel.read(buf).get(); 
       buf.rewind(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 

     } 

    @Override 
    public void failed(Throwable exc, Void attachment) { 
     throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. 
    } 

} 

ответ

0

Этот код имеет несколько проблем.
Первое чтение не гарантирует, что он будет читать все оставшиеся байты, но вызывает обработчик завершения, как только он читает хотя бы один байт. Поэтому вам нужно проверить положение буфера и повторно вызвать чтение до тех пор, пока у вас не будет 9 байтов для заголовка или длины для полезной нагрузки.

Для второй части, чтобы продолжить асинхронный подход, снова запустите обработчик read с завершением. Вы можете создать новый, который специально обрабатывал бы полезную нагрузку или повторно использовал бы существующую, и вам нужно будет запомнить состояние:

switch (state) { 
case READ_HEADER: 
    if (msg.remaining() > 0) { 
     channel.read(msg, null, this); 
     return; 
    } 
    // do the parsing the IP and length 
    state = READ_PAYLOAD; 
    channel.read(payloadBuf, null, this); 
    break; 

case READ_PAYLOAD: 
    if (payloadBuf.remaining() > 0) { 
     channel.read(payloadBuf, null, this); 
     return; 
    } 
    payloadBuf.flip(); 
    // get content from payloadBuf 
    break; 
} 
Смежные вопросы