У меня очень странное поведение, читая байты из входного потока сокета.Java Socket InputStream читает недостающие байты
В моем проекте клиенты обращаются к службе. Для каждого запроса будет установлено новое соединение.
Сначала отправляются байты, которые сообщают службе, какой запрос будет следовать.
Затем сам запрос отправляется.
Служба принимает байты и отправляет запрос. Это работает как минимум на 95% от всех запросов. Для остальных 5% есть странное поведение, которое я не могу понять.
Байт - это не все байты, которые были отправлены. Но самое странное в этом вопросе то, что недостающие байты не находятся в начале или в конце потока. Они распространяются по всему потоку.
К сожалению, я не могу предоставить полный код здесь, потому что это связано с работой. Но я могу предоставить тестовый код, который показывает сама проблема.
Чтобы выяснить, что происходит, я написал 2 класса. Один происходит от java.net.Socket
, а другой - от java.net.ServerSocket
.
Вот код:
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
public class DebugSocket extends Socket
{
private class InputStreamWrapper extends InputStream
{
private int
availables,
closes,
marksupporteds,
resets;
private List<Integer>
marks = new ArrayList<Integer>(),
reads = new ArrayList<Integer>();
private List<Long>
skips = new ArrayList<Long>();
@Override
public int available() throws IOException
{
availables++;
return DebugSocket.this.origininput.available();
}
@Override
public void close() throws IOException
{
closes++;
DebugSocket.this.origininput.close();
}
@Override
public synchronized void mark(int readlimit)
{
marks.add(readlimit);
DebugSocket.this.origininput.mark(readlimit);
}
@Override
public boolean markSupported()
{
marksupporteds++;
return DebugSocket.this.origininput.markSupported();
}
@Override
public synchronized void reset() throws IOException
{
resets++;
DebugSocket.this.origininput.reset();
}
@Override
public int read() throws IOException
{
int read = DebugSocket.this.origininput.read();
reads.add(read);
if (read != -1)
{
DebugSocket.this.inputdebugbuffer.write(read);
}
return read;
}
@Override
public int read(byte[] b) throws IOException
{
int read = DebugSocket.this.origininput.read(b);
DebugSocket.this.inputdebugbuffer.write(b, 0, read);
return read;
}
@Override
public int read(byte[] b, int off, int len) throws IOException
{
int read = DebugSocket.this.origininput.read(b, off, len);
DebugSocket.this.inputdebugbuffer.write(b, off, read);
return read;
}
@Override
public long skip(long n) throws IOException
{
long skipped = DebugSocket.this.origininput.skip(n);
skips.add(skipped);
return skipped;
}
}
private class OutputStreamWrapper extends OutputStream
{
private int
flushes,
closes;
@Override
public void close() throws IOException
{
closes++;
DebugSocket.this.originoutput.close();
}
@Override
public void flush() throws IOException
{
flushes++;
DebugSocket.this.originoutput.flush();
}
@Override
public void write(int b) throws IOException
{
DebugSocket.this.outputdebugbuffer.write(b);
DebugSocket.this.originoutput.write(b);
DebugSocket.this.originoutput.flush();
}
@Override
public void write(byte[] b) throws IOException
{
DebugSocket.this.outputdebugbuffer.write(b);
DebugSocket.this.originoutput.write(b);
DebugSocket.this.originoutput.flush();
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
DebugSocket.this.outputdebugbuffer.write(b, off, len);
DebugSocket.this.originoutput.write(b, off, len);
DebugSocket.this.originoutput.flush();
}
}
private static final Object
staticsynch = new Object();
private static long
idcounter = 0;
private final long
id;
private final ByteArrayOutputStream
inputdebugbuffer,
outputdebugbuffer;
private final InputStream
inputwrapper;
private final OutputStream
outputwrapper;
private InputStream
origininput;
private OutputStream
originoutput;
public InputStream getInputStream() throws IOException
{
if (origininput == null)
{
synchronized (inputdebugbuffer)
{
if (origininput == null)
{
origininput = super.getInputStream();
}
}
}
return inputwrapper;
}
public OutputStream getOutputStream() throws IOException
{
if (originoutput == null)
{
synchronized (outputdebugbuffer)
{
if (originoutput == null)
{
originoutput = super.getOutputStream();
}
}
}
return outputwrapper;
}
public DebugSocket()
{
id = getNextId();
inputwrapper = new InputStreamWrapper();
outputwrapper = new OutputStreamWrapper();
inputdebugbuffer = new ByteArrayOutputStream();
outputdebugbuffer = new ByteArrayOutputStream();
}
private static long getNextId()
{
synchronized (staticsynch)
{
return ++idcounter;
}
}
}
import java.io.IOException;
import java.net.ServerSocket;
public class DebugServerSocket extends ServerSocket
{
public DebugServerSocket() throws IOException
{
super();
}
public DebugSocket accept() throws IOException
{
DebugSocket s = new DebugSocket();
implAccept(s);
return s;
}
}
Класс DebugSocket
принимает уведомление о каждом взаимодействии с InputStream
, а также OutputStream
Теперь, когда проблема возникает, я всегда могу видеть, что байты отсутствуют ,
Вот пример:
Клиент отправить 1758 байт. Я получил 23 верхних байта от члена outputdebugbuffer
в DebugSocket
.
Bytes: 0,0,0,0,0,0,0,2,0,0,6,-46,31,-117,8,0,0,0,0,0,0,0,-83
Получен 227 байт. Для проблем с отладкой я всегда читаю входной поток, пока не получу -1, чтобы все байты продолжались. Теперь 16 ведущих байтов на serveride, которые я получил от члена inputdebugbuffer
в DebugSocket
.
Bytes: 0,0,0,6,-46,31,-117,8,0,0,0,0,0,0,0,-83
Как показано, отсутствует 7 байт. первые 8 байтов - это длинное значение, которое я изменил на байтовое значение для отладки. Поэтому я решил, что первый байт всегда правильный.
Если это был отказ в коде, запрос не будет продолжен, но, как я уже сказал, это произойдет только до 5% всех соединений в лучшем случае.
У вас есть идеи, что здесь происходит?
Я также использовал DataInputStream
и DataOutputStream
для того, чтобы отправить данные. Я всегда зачищаю после каждой операции записи, как вы можете видеть в OutputStreamWrapper
DebugSocket
.
Я что-то пропустил?
Если требуется какой-либо другой код, я постараюсь его опубликовать.
P.S. Услуга многопоточна и обрабатывает 100 запросов параллельно. Также клиенты многопоточны и выполняют 20 запросов параллельно. Как сказано, каждый запрос использует свое одно соединение и закрывает его сразу после завершения запроса.
Надеюсь, у кого-то появилась идея по этому вопросу.
Edit:
Там нет основного метода, чтобы показать, что ничего не делает, как спросили в комментариях, но здесь блоки кода клиента и сервер, которые используются.
Клиент: (Run параллельно в 20 нитей)
public void sendRequest(long _requesttype, byte[] _bytes)
{
Socket socket = null;
DataInputStream input = null;
DataOutputStream output = null;
InputStream sinput = null;
OutputStream soutput = null;
try
{
socket = new DebugSocket();
socket.connect(serveraddress);
sinput = socket.getInputStream();
soutput = socket.getOutputStream();
input = new DataInputStream(sinput);
output = new DataOutputStream(soutput);
output.writeLong(_requesttype);
output.flush();
soutput.flush();
output.write(_bytes);
output.flush();
soutput.flush();
// wait for notification byte that service had received all data.
input.readByte();
}
catch (IOException ex)
{
LogHelper.log(ex);
}
catch (Error err)
{
throw err;
}
finally
{
output.flush();
soutput.flush();
input.close();
output.close();
finishSocket(socket);
}
}
Сервер: (. Запуск в потоке для каждого запроса до 100 потоков)
public void proceedRequest(DebugSocket _socket)
{
DataInputStream input = null;
DataOutputStream output = null;
InputStream sinput = null;
OutputStream soutput = null;
try
{
sinput = _socket.getInputStream();
soutput = _socket.getOutputStream();
input = new DataInputStream(sinput);
output = new DataOutputStream(soutput);
RequestHelper.proceed(input.readLong(), input, output);
// send notification byte to the client.
output.writeByte(1);
output.flush();
soutput.flush();
}
catch (IOException ex)
{
LogHelper.log(ex);
}
catch (Error err)
{
throw err;
}
finally
{
output.flush();
soutput.flush();
input.close();
output.close();
}
}
В коде сервера задается readLong()
уже не удается вызвать недостающие байты.
Вы отправили некоторый код, но я не уверен, относится ли ваш код к вашей проблеме - возможно, это относится к вашей отладке, но в этом случае вы можете просто сказать: «Я получил байты [.. .] "и не объяснять, какой именно код вы использовали для этого. – immibis
Если вы можете сузить свою проблему до [минимального, полного и проверяемого примера] (http://stackoverflow.com/help/mcve), это * будет * очень полезно. – immibis
Вы находитесь прямо здесь, но я пытался предотвратить вопросы о моем чтении и записи поведения в коде, который связан с работой. Эти 2 класса используются и показывают проблему на самом низком уровне. Не имеет значения, какие операции выполняются с помощью DebugSocket, которые все реконструируются. – Oelsni