2013-02-20 2 views
0

Я пишу программу для тестирования API-интерфейсов Java (старый io vs nio vs nio2).Мой код сокета Java развращает данные

У меня есть сервер, который посылает только два значения:

  1. System.nanoTime()
  2. счетчик, который подсчитывает количество отправленных сообщений.

Клиент получает эти данные, сравнивает удаленный System.nanoTime() с локальной меткой времени для вычисления задержки и проверяет счетчик, чтобы убедиться, что данные не были удалены.

Поскольку это всего лишь тест, сервер и клиент работают в одной JVM. 90% данных передаются правильно; однако время от времени временная метка полностью ошибочна. Похоже, что это может быть ошибка over/underflow, но я не вижу, как это можно было бы ввести. Ниже приведен пример ошибки:

ERROR: counter 3, remoteTS -8267580102784516096, localTS 155321716184402, diff 8267735424500700498

Обратите внимание, что локальная временная метка 155321716184402 переводится немного после 7 часов вечера. Но удаленная временная метка просто отрицательна! Если вы посмотрите в коде, я не буду придумывать математику, но это не может быть отрицательным. Я также не вижу, как я могу получить ошибку переполнения. Я думал, что это может быть связано с большим против маленького endian, но тогда каждая ценность будет неправильной, а не только некоторыми из них.

код (который извлекается из немного большего теста) выглядит следующим образом:

package networkioshootout; 

import static java.lang.System.out; 

import java.io.DataOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.net.InetAddress; 
import java.net.ServerSocket; 
import java.net.Socket; 
import java.net.UnknownHostException; 
import java.nio.ByteBuffer; 
import java.util.Date; 
import java.util.Random; 
import java.util.concurrent.ExecutionException; 


public class DebugNetwork { 
    private final static int SENDCOUNT = 100; 
    private final static int PORT = 9000; 
    private final static int TESTLOOP = 10; 
    private final static Random rn = new Random(); 

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { 
     long currentNanos = System.nanoTime(); 
     long currentMillis = System.currentTimeMillis(); 
     Date now = new Date(); 
     System.out.println(String.format("Current date/time:%s, nanos:%s, millis:%s", 
       now, currentNanos, currentMillis)); 

     //Server 
     new Server().start(); 

     //Client 
     for(int i=0; i< TESTLOOP; i++){ 
      final int DATASIZE = (1+rn.nextInt(99))*8; 
      clientInputstream(DATASIZE); 
     } 
    } 

    private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException { 
     final byte[] internalBuffer = new byte[bufferSize+16] ; 
     final ByteBuffer longExtractor = ByteBuffer.allocate(16); 

     int bytesReadSoFar = 0; 
     long counter = 0; 

     Socket client = new Socket(InetAddress.getLocalHost(), PORT); 
     InputStream in = client.getInputStream(); 

     byte[] data = new byte[bufferSize]; 
     int size = 0; 

     try{ 
      while(-1 != (size = in.read(data))){ 
       for(int i=0; i < size; i++){ 
        internalBuffer[i+bytesReadSoFar] = data[i]; 
       } 
       bytesReadSoFar += size; 

       if(bytesReadSoFar >= 16){ 
        int values = bytesReadSoFar/16; 
        int toRead = values; 
        int remainder = bytesReadSoFar % 16; 

        for(int i=0; i< toRead; i++){ 
         int j = i * 16; 

         //long remoteTS = ByteBuffer.wrap(new byte[]{internalBuffer[j+0],internalBuffer[j+1],internalBuffer[j+2],internalBuffer[j+3],internalBuffer[j+4],internalBuffer[j+5],internalBuffer[j+6],internalBuffer[j+7]}).getLong(); 
         //long remoteCounter = ByteBuffer.wrap(new byte[]{internalBuffer[j+8],internalBuffer[j+9],internalBuffer[j+10],internalBuffer[j+11],internalBuffer[j+12],internalBuffer[j+13],internalBuffer[j+14],internalBuffer[j+15]}).getLong(); 

         //long remoteTS = data[0] | ((int)(data[1]) << 4) | ((int)(data[1]) << 8) | ((int)(data[1]) << 12) | ((int)(data[1]) << 16) | ((int)(data[1]) << 20) | ((int)(data[1]) << 24) ; 

         longExtractor.put(internalBuffer, j, 16); 
         longExtractor.flip(); 
         long remoteTS = longExtractor.getLong(); 
         long remoteCounter = longExtractor.getLong(); 
         longExtractor.clear(); 

         if(remoteCounter != counter){ 
          String error = "ERROR: Expected remote counter to be "+counter+" but it was actually "+remoteCounter; 
          //System.out.println(error); 
          throw new RuntimeException(error); 
         } 
         counter++; 

         long localTS = System.nanoTime(); 
         long latency = localTS - remoteTS; 
         if(Math.abs(latency) > 1200000000) { 
          out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s", 
            counter, remoteTS, localTS, latency)); 
          continue; 
         } 


        } 

        //System.arraycopy(data, toRead, data, 0, remainder); 
        for(int i=0; i < remainder; i++){ 
         internalBuffer[i] = internalBuffer[i+toRead]; 
        } 
        bytesReadSoFar = remainder; 
       } 
      } 
     } 
     finally{ 
      client.close(); 
     } 
    } 

    static final class Server extends Thread{ 

     public void run(){ 
      try { 
       startServer(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 

     private static void startServer() throws IOException { 
      final ServerSocket server = new ServerSocket(PORT); 

      //System.out.println("Server listening on port "+PORT); 

      while(true){ 
       final Socket c1 = server.accept(); 
       c1.setTcpNoDelay(true); 
       //System.out.println("Client connected"); 
       new Thread(new Runnable() { 

        @Override 
        public void run() { 
         long totalMsgs = 0; 
         long counter = 0; 
         DataOutputStream serverout; 
         try { 
          serverout = new DataOutputStream(c1.getOutputStream()); 
          for(int i=0;i<SENDCOUNT;i++){ 
           serverout.writeLong(System.nanoTime()); 
           serverout.writeLong(counter); 
           totalMsgs++; 
           counter++; 
          } 
          //System.out.println("Sent bytes to client: "+total); 
         } catch (IOException e) { 
          out.println("Messages sent:"+totalMsgs+", current counter:"+counter); 
          e.printStackTrace(); 
         } 
         finally{ 
          //System.out.println("Client disconnected when counter was "+counter); 
          try { c1.close(); } catch (IOException e) { e.printStackTrace();} 
         } 
        } 
       }).start(); 
      } 
     } 
    } 

} 

EDIT: Поскольку есть некоторые замечания по этому поводу, фактическая программа имеет клиентов, подключенных к серверу через вход поток, буферный поток, NIO, NIO2. Это более полная (но устаревшая) версия программы: https://gist.github.com/falconair/4975243

Мне еще нужно добавить поток данных, экспериментировать с параметрами сокета и т. Д. Я хотел бы решить проблему с повреждением данных до того, как я двигаться дальше.

+0

Что произойдет, если 'bytesReadSoFar' есть 40? – parsifal

+0

Скорее всего, это намного проще закодировать с DataInputStream.readLong(), чем все это ручное кодирование, ByteBuffers и т. Д., И это, вероятно, избавит вас от ваших ошибок. – EJP

+0

@ EJP с использованием входного потока на клиенте является частью теста. Я также использую буферизованные потоки, селектора, async и т. Д., Все из которых были удалены для этого вопроса. Все дело в том, чтобы сравнить подходы :) – Shahbaz

ответ

1

ошибка что-то делать с использованием data[] и internalBuffer[] и все, что перетасовки данных. Я не вижу, чтобы реальные клиенты писались, используя что-то вроде этого кода. Любой здравомыслящий просто использовал бы BufferedInputStream.

Если вы хотите, чтобы проверить влияние различных размеров буфера, используйте new DataInputStream(new BufferedInputStream(socket.getInputStream(), bufferSize)) и readLong() и избавиться от data и internalBuffer и longExtractor в целом: они просто вызывая ненужные проблемы.

следующие работы flawlessy:

private static void clientInputstream(int bufferSize) throws IOException, UnknownHostException 
{ 
    long counter = 0; 

    Socket client = new Socket(InetAddress.getLocalHost(), PORT); 
    DataInputStream in = new DataInputStream(new BufferedInputStream(client.getInputStream(), bufferSize)); 

    try 
    { 
     for (;;) 
     { 
      long remoteTS = in.readLong(); 
      long remoteCounter = in.readLong(); 
      if (remoteCounter != counter) 
      { 
       String error = "ERROR: Expected remote counter to be " + counter + " but it was actually " + remoteCounter; 
       //System.out.println(error); 
       throw new RuntimeException(error); 
      } 
      counter++; 

      long localTS = System.nanoTime(); 
      long latency = localTS - remoteTS; 
      if (Math.abs(latency) > 1200000000) 
      { 
       out.println(String.format("ERROR: counter %s, remoteTS %s, localTS %s, diff %s", 
        counter, remoteTS, localTS, latency)); 
       continue; 
      } 
     } 
    } 
    catch (EOFException exc) 
    { 
     System.out.println("EOS"); 
    } 
    finally 
    { 
     client.close(); 
    } 
} 
+0

Ошибка была в коде внизу цикла, когда неиспользуемые байты записываются во внутренний буфер, меняя «toRead», вместо 'toRead * 16'! Спасибо за подробный ответ @EJP – Shahbaz

0

Значение, возвращаемое System.nanoTime(), относится только к работающей JVM. Вместо этого вы должны использовать System.currentTimeMillis().

This method can only be used to measure elapsed time and is not related to any other notion of system or wall-clock time. The value returned represents nanoseconds since some fixed but arbitrary origin time (perhaps in the future, so values may be negative). The same origin is used by all invocations of this method in an instance of a Java virtual machine; other virtual machine instances are likely to use a different origin.

Edit:

Поскольку вы проводите тест в той же JVM, источник (это) ошибки должен отличаться от того, что было сказано выше (хотя вы должны рассмотреть возможность использования'currentTimeMillis' так, чтобы значения сопоставимы между разными JVM).

Я бы рекомендовал буферизацию потока с помощью BufferedInputStream, а затем чтение и обработку блоков N (16?) Байтов за раз.

Socket client = new Socket(InetAddress.getLocalHost(), PORT); 
InputStream in = new BufferedInputStream(client.getInputStream()); 

int length = 16, offset=0;  
while (length>0) { 
    int read = in.read(data,offset,length); 
    if (read<0) ... //connection error 
    offset+=read; 
    length-=read; 
} 
+1

Как сказал OP, клиент и сервер * работают в одной JVM. – parsifal

+0

@parsifal хорошая точка (хотя это было бы проблемой после того, как код был развернут в другой JVM). – Javier

+0

@ Javier этот тест фактически имеет несколько реализаций клиентов, один из которых - буферизованный вход. Я тестирую различные методы получения данных с сервера. – Shahbaz

Смежные вопросы