2015-05-01 3 views
1

У меня очень странное поведение, читая байты из входного потока сокета.Java Socket InputStream читает недостающие байты

В моем проекте клиенты обращаются к службе. Для каждого запроса будет установлено новое соединение.

Сначала отправляются байты, которые сообщают службе, какой запрос будет следовать.

Затем сам запрос отправляется.

Служба принимает байты и отправляет запрос. Это работает как минимум на 95% от всех запросов. Для остальных 5% есть странное поведение, которое я не могу понять.

Байт - это не все байты, которые были отправлены. Но самое странное в этом вопросе то, что недостающие байты не находятся в начале или в конце потока. Они распространяются по всему потоку.

К сожалению, я не могу предоставить полный код здесь, потому что это связано с работой. Но я могу предоставить тестовый код, который показывает сама проблема.

Чтобы выяснить, что происходит, я написал 2 класса. Один происходит от java.net.Socket, а другой - от java.net.ServerSocket.

Вот код:

import java.io.ByteArrayOutputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.OutputStream; 
import java.net.Socket; 
import java.util.ArrayList; 
import java.util.List; 



public class DebugSocket extends Socket 
{ 
    private class InputStreamWrapper extends InputStream 
    { 
     private int 
      availables, 
      closes, 
      marksupporteds, 
      resets; 

     private List<Integer> 
      marks = new ArrayList<Integer>(), 
      reads = new ArrayList<Integer>(); 

     private List<Long> 
      skips = new ArrayList<Long>(); 


     @Override 
     public int available() throws IOException 
     { 
      availables++; 
      return DebugSocket.this.origininput.available(); 
     } 

     @Override 
     public void close() throws IOException 
     { 
      closes++; 
      DebugSocket.this.origininput.close(); 
     } 

     @Override 
     public synchronized void mark(int readlimit) 
     { 
      marks.add(readlimit); 
      DebugSocket.this.origininput.mark(readlimit); 
     } 

     @Override 
     public boolean markSupported() 
     { 
      marksupporteds++; 
      return DebugSocket.this.origininput.markSupported(); 
     } 

     @Override 
     public synchronized void reset() throws IOException 
     { 
      resets++; 
      DebugSocket.this.origininput.reset(); 
     } 

     @Override 
     public int read() throws IOException 
     { 
      int read = DebugSocket.this.origininput.read(); 

      reads.add(read); 

      if (read != -1) 
      { 
       DebugSocket.this.inputdebugbuffer.write(read); 
      } 

      return read; 
     } 

     @Override 
     public int read(byte[] b) throws IOException 
     { 
      int read = DebugSocket.this.origininput.read(b); 

      DebugSocket.this.inputdebugbuffer.write(b, 0, read); 

      return read; 
     } 

     @Override 
     public int read(byte[] b, int off, int len) throws IOException 
     { 
      int read = DebugSocket.this.origininput.read(b, off, len); 

      DebugSocket.this.inputdebugbuffer.write(b, off, read); 

      return read; 
     } 

     @Override 
     public long skip(long n) throws IOException 
     { 
      long skipped = DebugSocket.this.origininput.skip(n); 

      skips.add(skipped); 

      return skipped; 
     } 
    } 

    private class OutputStreamWrapper extends OutputStream 
    { 
     private int 
      flushes, 
      closes; 


     @Override 
     public void close() throws IOException 
     { 
      closes++; 
      DebugSocket.this.originoutput.close(); 
     } 

     @Override 
     public void flush() throws IOException 
     { 
      flushes++; 
      DebugSocket.this.originoutput.flush(); 
     } 

     @Override 
     public void write(int b) throws IOException 
     { 
      DebugSocket.this.outputdebugbuffer.write(b); 
      DebugSocket.this.originoutput.write(b); 
      DebugSocket.this.originoutput.flush(); 
     } 

     @Override 
     public void write(byte[] b) throws IOException 
     { 
      DebugSocket.this.outputdebugbuffer.write(b); 
      DebugSocket.this.originoutput.write(b); 
      DebugSocket.this.originoutput.flush(); 
     } 

     @Override 
     public void write(byte[] b, int off, int len) throws IOException 
     { 
      DebugSocket.this.outputdebugbuffer.write(b, off, len); 
      DebugSocket.this.originoutput.write(b, off, len); 
      DebugSocket.this.originoutput.flush(); 
     } 
    } 


    private static final Object 
     staticsynch = new Object(); 

    private static long 
     idcounter = 0; 


    private final long 
     id; 

    private final ByteArrayOutputStream 
     inputdebugbuffer, 
     outputdebugbuffer; 

    private final InputStream 
     inputwrapper; 

    private final OutputStream 
     outputwrapper; 

    private InputStream 
     origininput; 

    private OutputStream 
     originoutput; 


    public InputStream getInputStream() throws IOException 
    { 
     if (origininput == null) 
     { 
      synchronized (inputdebugbuffer) 
      { 
       if (origininput == null) 
       { 
        origininput = super.getInputStream(); 
       } 
      } 
     } 

     return inputwrapper; 
    } 

    public OutputStream getOutputStream() throws IOException 
    { 
     if (originoutput == null) 
     { 
      synchronized (outputdebugbuffer) 
      { 
       if (originoutput == null) 
       { 
        originoutput = super.getOutputStream(); 
       } 
      } 
     } 

     return outputwrapper; 
    } 


    public DebugSocket() 
    { 
     id     = getNextId(); 
     inputwrapper  = new InputStreamWrapper(); 
     outputwrapper  = new OutputStreamWrapper(); 
     inputdebugbuffer = new ByteArrayOutputStream(); 
     outputdebugbuffer = new ByteArrayOutputStream(); 
    } 


    private static long getNextId() 
    { 
     synchronized (staticsynch) 
     { 
      return ++idcounter; 
     } 
    } 
} 
import java.io.IOException; 
import java.net.ServerSocket; 


public class DebugServerSocket extends ServerSocket 
{ 
    public DebugServerSocket() throws IOException 
    { 
     super(); 
    } 

    public DebugSocket accept() throws IOException 
    { 
     DebugSocket s = new DebugSocket(); 

     implAccept(s); 

     return s; 
    } 
} 

Класс DebugSocket принимает уведомление о каждом взаимодействии с InputStream, а также OutputStream

Теперь, когда проблема возникает, я всегда могу видеть, что байты отсутствуют ,

Вот пример:

Клиент отправить 1758 байт. Я получил 23 верхних байта от члена outputdebugbuffer в DebugSocket.

Bytes: 0,0,0,0,0,0,0,2,0,0,6,-46,31,-117,8,0,0,0,0,0,0,0,-83 

Получен 227 байт. Для проблем с отладкой я всегда читаю входной поток, пока не получу -1, чтобы все байты продолжались. Теперь 16 ведущих байтов на serveride, которые я получил от члена inputdebugbuffer в DebugSocket.

Bytes: 0,0,0,6,-46,31,-117,8,0,0,0,0,0,0,0,-83 

Как показано, отсутствует 7 байт. первые 8 байтов - это длинное значение, которое я изменил на байтовое значение для отладки. Поэтому я решил, что первый байт всегда правильный.

Если это был отказ в коде, запрос не будет продолжен, но, как я уже сказал, это произойдет только до 5% всех соединений в лучшем случае.

У вас есть идеи, что здесь происходит?

Я также использовал DataInputStream и DataOutputStream для того, чтобы отправить данные. Я всегда зачищаю после каждой операции записи, как вы можете видеть в OutputStreamWrapperDebugSocket.

Я что-то пропустил?

Если требуется какой-либо другой код, я постараюсь его опубликовать.

P.S. Услуга многопоточна и обрабатывает 100 запросов параллельно. Также клиенты многопоточны и выполняют 20 запросов параллельно. Как сказано, каждый запрос использует свое одно соединение и закрывает его сразу после завершения запроса.

Надеюсь, у кого-то появилась идея по этому вопросу.

Edit:

Там нет основного метода, чтобы показать, что ничего не делает, как спросили в комментариях, но здесь блоки кода клиента и сервер, которые используются.

Клиент: (Run параллельно в 20 нитей)

public void sendRequest(long _requesttype, byte[] _bytes) 
    { 
     Socket    socket = null; 
     DataInputStream  input = null; 
     DataOutputStream output = null; 
     InputStream   sinput = null; 
     OutputStream  soutput = null; 

     try 
     { 
      socket = new DebugSocket(); 

      socket.connect(serveraddress); 

      sinput = socket.getInputStream(); 
      soutput = socket.getOutputStream(); 

      input = new DataInputStream(sinput); 
      output = new DataOutputStream(soutput); 

      output.writeLong(_requesttype); 

      output.flush(); 
      soutput.flush(); 

      output.write(_bytes); 

      output.flush(); 
      soutput.flush(); 

      // wait for notification byte that service had received all data. 
      input.readByte(); 
     } 
     catch (IOException ex) 
     { 
      LogHelper.log(ex); 
     } 
     catch (Error err) 
     { 
      throw err; 
     } 
     finally 
     { 
      output.flush(); 
      soutput.flush(); 

      input.close(); 
      output.close(); 

      finishSocket(socket); 
     } 
    } 

Сервер: (. Запуск в потоке для каждого запроса до 100 потоков)

public void proceedRequest(DebugSocket _socket) 
    { 
     DataInputStream  input = null; 
     DataOutputStream output = null; 
     InputStream   sinput = null; 
     OutputStream  soutput = null; 

     try 
     { 
      sinput = _socket.getInputStream(); 
      soutput = _socket.getOutputStream(); 

      input = new DataInputStream(sinput); 
      output = new DataOutputStream(soutput); 

      RequestHelper.proceed(input.readLong(), input, output); 

      // send notification byte to the client. 
      output.writeByte(1); 

      output.flush(); 
      soutput.flush(); 
     } 
     catch (IOException ex) 
     { 
      LogHelper.log(ex); 
     } 
     catch (Error err) 
     { 
      throw err; 
     } 
     finally 
     { 
      output.flush(); 
      soutput.flush(); 

      input.close(); 
      output.close(); 
     } 
    } 

В коде сервера задается readLong() уже не удается вызвать недостающие байты.

+1

Вы отправили некоторый код, но я не уверен, относится ли ваш код к вашей проблеме - возможно, это относится к вашей отладке, но в этом случае вы можете просто сказать: «Я получил байты [.. .] "и не объяснять, какой именно код вы использовали для этого. – immibis

+0

Если вы можете сузить свою проблему до [минимального, полного и проверяемого примера] (http://stackoverflow.com/help/mcve), это * будет * очень полезно. – immibis

+0

Вы находитесь прямо здесь, но я пытался предотвратить вопросы о моем чтении и записи поведения в коде, который связан с работой. Эти 2 класса используются и показывают проблему на самом низком уровне. Не имеет значения, какие операции выполняются с помощью DebugSocket, которые все реконструируются. – Oelsni

ответ

0

Моя догадка есть ошибка где-то еще в коде. Я скопировал класс DebugSocket из вопроса и создал MCVE (см. Ниже). Он отлично работает, я не смог воспроизвести проблему «сервер не может читать долговременную ценность». Попробуйте изменить код ниже, чтобы добавить больше своего кода, пока не сможете воспроизвести проблему, которая должна дать вам представление о том, где искать основную причину.

import java.io.*; 
import java.net.*; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.*; 

public class TestDebugSocket implements Runnable, Closeable { 

    public static void main(String[] args) { 

     TestDebugSocket m = new TestDebugSocket(); 
     try { 
      m.run(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      m.close(); 
     } 
    } 

    final int clients = 20; 
    final boolean useDebugSocket = true; 
    final byte[] someBytes = new byte[1758]; 
    final ThreadPoolExecutor tp = (ThreadPoolExecutor) Executors.newCachedThreadPool(); 
    final AtomicLong clientId = new AtomicLong(); 
    final ConcurrentLinkedQueue<Closeable> closeables = new ConcurrentLinkedQueue<Closeable>(); 
    final long maxWait = 5_000L; 
    final CountDownLatch serversReady = new CountDownLatch(clients); 
    final CountDownLatch clientsDone = new CountDownLatch(clients); 

    ServerSocket ss; 
    int port; 

    @Override public void run() { 

     try { 
      ss = useDebugSocket ? new DebugServerSocket() : new ServerSocket(); 
      ss.bind(null); 
      port = ss.getLocalPort(); 
      tp.execute(new SocketAccept()); 
      for (int i = 0; i < clients; i++) { 
       ClientSideSocket css = new ClientSideSocket(); 
       closeables.add(css); 
       tp.execute(css); 
      } 
      if (!clientsDone.await(maxWait, TimeUnit.MILLISECONDS)) { 
       System.out.println("CLIENTS DID NOT FINISH"); 
      } else { 
       System.out.println("Finished"); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      close(); 
     } 
    } 

    @Override public void close() { 

     try { if (ss != null) ss.close(); } catch (Exception ignored) {} 
     Closeable c = null; 
     while ((c = closeables.poll()) != null) { 
      try { c.close(); } catch (Exception ignored) {} 
     } 
     tp.shutdownNow(); 
    } 

    class DebugServerSocket extends ServerSocket { 

     public DebugServerSocket() throws IOException { 
      super(); 
     } 

     @Override public DebugSocket accept() throws IOException { 

      DebugSocket s = new DebugSocket(); 
      implAccept(s); 
      return s; 
     } 
    } 

    class SocketAccept implements Runnable { 

     @Override public void run() { 
      try { 
       for (int i = 0; i < clients; i++) { 
        SeverSideSocket sss = new SeverSideSocket(ss.accept()); 
        closeables.add(sss); 
        tp.execute(sss); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    class SeverSideSocket implements Runnable, Closeable { 

     Socket s; 
     public SeverSideSocket(Socket s) { 
      this.s = s; 
     } 
     @Override public void run() { 

      Long l = -1L; 
      byte[] received = new byte[someBytes.length]; 
      try { 
       DataInputStream in = new DataInputStream(s.getInputStream()); 
       DataOutputStream out = new DataOutputStream(s.getOutputStream()); 
       serversReady.countDown(); 
       if (!serversReady.await(maxWait, TimeUnit.MILLISECONDS)) { 
        System.out.println("CLIENTS DID NOT CONNECT ON TIME TO SERVER"); 
       } 
       l = in.readLong(); 
       in.readFully(received); 
       out.writeByte(1); 
       out.flush(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } finally { 
       // write to console at end to prevent synchronized socket I/O 
       System.out.println("received long: " + l); 
       close(); 
      } 
     } 

     @Override public void close() { 
      TestDebugSocket.close(s); 
      s = null; 
     } 
    } 

    class ClientSideSocket implements Runnable, Closeable { 

     Socket s; 

     @SuppressWarnings("resource") 
     @Override public void run() { 

      Long l = -1L; 
      Byte b = -1; 
      try { 
       s = useDebugSocket ? new DebugSocket() : new Socket(); 
       s.connect(new InetSocketAddress(port)); 
       DataInputStream in = new DataInputStream(s.getInputStream()); 
       DataOutputStream out = new DataOutputStream(s.getOutputStream()); 
       l = clientId.incrementAndGet(); 
       out.writeLong(l); 
       out.write(someBytes); 
       out.flush(); 
       b = in.readByte(); 
      } catch (Exception e) { 
       e.printStackTrace(); 
      } finally { 
       System.out.println("long send: " + l + ", result: " + b); 
       close(); 
       clientsDone.countDown(); 
      } 
     } 

     @Override public void close() { 
      TestDebugSocket.close(s); 
      s = null; 
     } 
    } 

    static void close(Socket s) { 

     try { if (s != null) s.close(); } catch (Exception ignored) {} 
    } 
} 
+0

Хмм да, это то, что я делаю, а если ваши системные ауты совпадают, то я получил интересную ошибку, потому что я не могу понять, что может испортить порядок байтов. Я уже добавил запись в DebugSocket, которая принимает CallStack, а также поток. Здесь тоже нет провала. Только один поток взаимодействует с сокетом, и вызовы поступают только из хорошо известных и ожидаемых мест. Я буду смотреть дальше, и если я найду причину, я опубликую ее. – Oelsni

0

Ok im done со всеми возможными способами определения причины. Из моего опыта программирования сокетов и параллельной обработки я могу сказать, что в самом коде нет ошибки. Обнюхивает, скажи мне это. Что-то на моей машине возится с трансмиссией.

Я дезактивировал все, что мог придумать (брандмауэр/антивирус/вредоносный сканер), но никакого эффекта.

У кого-то есть идея, что еще может испортить пакеты tcp?

Редактировать:

ОК, я понял. AVG 2014 возится. Jetzt, отключивший компоненты, не работал. В Options-> Settings есть пункт меню, вы можете отключить AVG-Protection.

У кого-то есть знания по этой теме?

+1

О том же проблеме сообщалось [здесь] (http://stackoverflow.com/a/26066718/3080094). Используйте [MSE] (http://windows.microsoft.com/en-us/windows/security-essentials-download) вместо AVG (я использую MSE, но не имею проблем)? – vanOekel

+0

Wow thanks. Это помогает решить проблему. Дело в том, что 5 дней назад у меня был тест, бегущий почти 4 часа без каких-либо проблем, а затем он был там. Теперь мне нужно всего несколько секунд и бум проблема всплывает ^^ – Oelsni

Смежные вопросы