2008-12-13 2 views
9

Есть ли у кого-нибудь хорошие предложения по созданию объекта Pipe в Java, который равен как InputStream, так и OutputStream, поскольку Java не имеет множественного наследования, и оба потока являются абстрактными классами, а не интерфейсы?Входная и выходная потоковая труба в Java

Основополагающая необходимость состоит в том, чтобы иметь один объект, который может быть передан вещам, для которых требуется либо InputStream, либо OutputStream для вывода вывода из одного потока для ввода другого.

ответ

8

Похоже, что этот вопрос упущен. Если я правильно вас понимаю, вам нужен объект, который работает как InputStream в одном потоке, и OutputStream в другом, чтобы создать средство для обмена данными между двумя потоками.

Возможно, один ответ заключается в использовании композиции вместо наследования (что рекомендуется в любом случае). Создайте трубку, которая содержит PipedInputStream и PipedOutputStream, связанные друг с другом, с методами getInputStream() и getOutputStream().

Вы не можете напрямую передать объект Pipe тому, что требуется для потока, но вы можете передать возвращаемое значение его методов get для этого.

Это работает для вас?

3

Это, пожалуй, довольно обычная вещь. См. Этот вопрос.

Easy way to write contents of a Java InputStream to an OutputStream

+0

I знаю о PipedXxxStream ... но я хотел создать только один объект Pipe, который может быть задан как InputStream для одного потока, а OutputStream - для другого. Я надеялся, что, возможно, что-то пропустил. – Baginsss 2008-12-13 07:35:19

+0

Не было бы очень сложно написать что-то, что может дать вам входной поток, который подключается к выходному потоку, но вы не сможете расширять как InputStream, так и OutputStream. Наследование, плохо. Композиция, хорошо. – Apocalisp 2008-12-13 08:14:43

5

java.io.PipedOutputStream и java.io.PipedInputStream выглядят как классы, чтобы использовать для этого сценария. Они предназначены для совместного использования для передачи данных между потоками.

Если вы действительно хотите, чтобы какой-либо отдельный объект проходил мимо, он должен был содержать один из них и выставлять их через геттеры.

1

Вы не можете создать класс, который является производным как от InputStream и OutputStream, потому что они не являются интерфейсами, и они имеют общие методы и Java не позволяет множественное наследование (компилятор не знает, можно ли назвать InputStream.close() или OutputStream.close() если вы вызываете close() на новый объект).

Другой проблемой является буфер. Java хочет выделить статический буфер для данных (который не изменяется). Это означает, что когда вы используете `java.io.PipedXxxStream ', запись данных в него будет блокироваться, если вы не используете два разных потока.

Итак, ответ от Apocalisp верен: вы должны написать цикл копирования.

Я предлагаю вам включить в проект проект Apocal's commons-io, который содержит много вспомогательных подпрограмм только для таких задач (копировать данные между потоками, файлами, строками и всеми их комбинациями).

+0

Вы можете использовать внутренние классы для достижения чего-то похожего на один класс, отображающий как входной, так и выходной поток при совместном использовании буфера. – Charlie 2015-10-08 06:08:08

0

я должен был реализовать фильтр для медленных соединений сервлет поэтому в основном я обернутый выходной сервлета потока в QueueOutputStream, который добавит каждый байты (в небольших буферах), в очередь, а затем выводится эти небольшие буферы во второй выходной поток, поэтому в некотором смысле это действует как поток ввода-вывода, IMHO это лучше, чем JDK-каналы, которые не будут масштабироваться так хорошо, в основном происходит слишком много переключения контекста в стандартной реализации JDK (в чтение/запись), блокирующая очередь просто идеальна для одного сценария производителя/потребителя:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 
Смежные вопросы