я должен был реализовать фильтр для медленных соединений сервлет поэтому в основном я обернутый выходной сервлета потока в QueueOutputStream, который добавит каждый байты (в небольших буферах), в очередь, а затем выводится эти небольшие буферы во второй выходной поток, поэтому в некотором смысле это действует как поток ввода-вывода, IMHO это лучше, чем JDK-каналы, которые не будут масштабироваться так хорошо, в основном происходит слишком много переключения контекста в стандартной реализации JDK (в чтение/запись), блокирующая очередь просто идеальна для одного сценария производителя/потребителя:
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;
public class QueueOutputStream extends OutputStream
{
private static final int DEFAULT_BUFFER_SIZE=1024;
private static final byte[] END_SIGNAL=new byte[]{};
private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
private final byte[] buffer;
private boolean closed=false;
private int count=0;
public QueueOutputStream()
{
this(DEFAULT_BUFFER_SIZE);
}
public QueueOutputStream(final int bufferSize)
{
if(bufferSize<=0){
throw new IllegalArgumentException("Buffer size <= 0");
}
this.buffer=new byte[bufferSize];
}
private synchronized void flushBuffer()
{
if(count>0){
final byte[] copy=new byte[count];
System.arraycopy(buffer,0,copy,0,count);
queue.offer(copy);
count=0;
}
}
@Override
public synchronized void write(final int b) throws IOException
{
if(closed){
throw new IllegalStateException("Stream is closed");
}
if(count>=buffer.length){
flushBuffer();
}
buffer[count++]=(byte)b;
}
@Override
public synchronized void write(final byte[] b, final int off, final int len) throws IOException
{
super.write(b,off,len);
}
@Override
public synchronized void close() throws IOException
{
flushBuffer();
queue.offer(END_SIGNAL);
closed=true;
}
public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
{
return executor.submit(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try{
byte[] buffer=queue.take();
while(buffer!=END_SIGNAL){
outputStream.write(buffer);
buffer=queue.take();
}
outputStream.flush();
} catch(Exception e){
close();
throw e;
} finally{
outputStream.close();
}
return null;
}
}
);
}
I знаю о PipedXxxStream ... но я хотел создать только один объект Pipe, который может быть задан как InputStream для одного потока, а OutputStream - для другого. Я надеялся, что, возможно, что-то пропустил. – Baginsss 2008-12-13 07:35:19
Не было бы очень сложно написать что-то, что может дать вам входной поток, который подключается к выходному потоку, но вы не сможете расширять как InputStream, так и OutputStream. Наследование, плохо. Композиция, хорошо. – Apocalisp 2008-12-13 08:14:43