2008-12-13 20 views
9

¿Alguien tiene alguna sugerencia para crear un objeto Pipe en Java que es tanto un InputStream como un OutputStream dado que Java no tiene herencia múltiple y ambas secuencias son clases abstractas en lugar de interfaces?Tubería de entrada y salida en Java

La necesidad subyacente es tener un solo objeto que se pueda pasar a cosas que necesitan un InputStream o un OutputStream para canalizar la salida de un hilo a la entrada de otro.

Respuesta

8

Parece que el punto de esta pregunta se está perdiendo. Si te entiendo correctamente, quieres un objeto que funcione como un InputStream en un hilo, y un OutputStream en otro para crear un medio de comunicación entre los dos hilos.

Quizás una respuesta es utilizar composición en lugar de herencia (que es la práctica recomendada de todos modos). Cree un conducto que contenga un PipedInputStream y un PipedOutputStream conectados entre sí, con los métodos getInputStream() y getOutputStream().

No se puede pasar directamente el objeto Pipe a algo que necesita una secuencia, pero puede pasar el valor de retorno de sus métodos get para hacerlo.

¿Eso funciona para usted?

+2

¡Amen por la composición! – Wivani

+0

¿cómo se agrega el PipedInputStream y un PipedOutputStream al Pipe y se conectan? – Sabobin

3

Esto es algo bastante común de hacer, creo. Ver esta pregunta

Easy way to write contents of a Java InputStream to an OutputStream

+0

I Conozco PipedXxxStream ... pero quería crear solo un objeto Pipe que pudiera ser dado como un InputStream a un hilo y un OutputStream a otro. Tenía la esperanza de haberme perdido algo. – Baginsss

+0

No sería muy difícil escribir algo que pueda proporcionarle un flujo de entrada que canaliza a una salida de salida, pero no podrá extender tanto InputStream como OutputStream. Herencia, malo Composición, bien. – Apocalisp

5

java.io.PipedOutputStream y java.io.PipedInputStream parecen ser las clases a utilizar para este escenario. Están diseñados para usarse juntos para canalizar datos entre hilos.

Si realmente desea que pase un solo objeto, debería contener uno de estos y exponerlos mediante getters.

1

No se puede crear una clase que deriva tanto de InputStream y OutputStream porque éstas no son las interfaces y tienen métodos comunes y Java no permite la herencia múltiple (el compilador no sabe si llamar o InputStream.close()OutputStream.close() si llama al close() en su nuevo objeto).

El otro problema es el búfer. Java quiere asignar un buffer estático para los datos (que no cambia). Esto significa que cuando use `java.io.PipedXxxStream ', los datos de escritura eventualmente se bloquearán a menos que use dos hilos diferentes.

Entonces, la respuesta de Apocalisp es correcta: debe escribir un ciclo de copiado.

Sugiero que incluya los recursos comunes de Apache en su proyecto que contiene muchas rutinas auxiliares solo para tareas como esta (copiar datos entre secuencias, archivos, cadenas y todas las combinaciones de los mismos).

+0

Puede usar clases internas para lograr algo similar a una clase que expone una secuencia de entrada y salida mientras comparte un búfer. – Charlie

0

tuviera que aplicar un filtro para conexiones lentas a servlets así que básicamente me envolvió el flujo de salida del servlet en un QueueOutputStream que se sumarán todos los bytes (en pequeñas memorias intermedias), en una cola, y luego la salida esos pequeños buffers a un segundo flujo de salida, por lo que de alguna manera esto actúa como flujo de entrada/salida, en mi humilde opinión esto es mejor que los JDK que no se escalarán bien, básicamente hay demasiada conmutación de contexto en la implementación estándar de JDK (por lectura/escritura), una cola de bloqueo es perfecta para un único escenario productor/consumidor:

import java.io.IOException; 
import java.io.OutputStream; 
import java.util.concurrent.*; 

public class QueueOutputStream extends OutputStream 
{ 
    private static final int DEFAULT_BUFFER_SIZE=1024; 
    private static final byte[] END_SIGNAL=new byte[]{}; 

    private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>(); 
    private final byte[] buffer; 

    private boolean closed=false; 
    private int count=0; 

    public QueueOutputStream() 
    { 
    this(DEFAULT_BUFFER_SIZE); 
    } 

    public QueueOutputStream(final int bufferSize) 
    { 
    if(bufferSize<=0){ 
     throw new IllegalArgumentException("Buffer size <= 0"); 
    } 
    this.buffer=new byte[bufferSize]; 
    } 

    private synchronized void flushBuffer() 
    { 
    if(count>0){ 
     final byte[] copy=new byte[count]; 
     System.arraycopy(buffer,0,copy,0,count); 
     queue.offer(copy); 
     count=0; 
    } 
    } 

    @Override 
    public synchronized void write(final int b) throws IOException 
    { 
    if(closed){ 
     throw new IllegalStateException("Stream is closed"); 
    } 
    if(count>=buffer.length){ 
     flushBuffer(); 
    } 
    buffer[count++]=(byte)b; 
    } 

    @Override 
    public synchronized void write(final byte[] b, final int off, final int len) throws IOException 
    { 
    super.write(b,off,len); 
    } 

    @Override 
    public synchronized void close() throws IOException 
    { 
    flushBuffer(); 
    queue.offer(END_SIGNAL); 
    closed=true; 
    } 

    public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream) 
    { 
    return executor.submit(
      new Callable<Void>() 
      { 
       @Override 
       public Void call() throws Exception 
       { 
       try{ 
        byte[] buffer=queue.take(); 
        while(buffer!=END_SIGNAL){ 
        outputStream.write(buffer); 
        buffer=queue.take(); 
        } 
        outputStream.flush(); 
       } catch(Exception e){ 
        close(); 
        throw e; 
       } finally{ 
        outputStream.close(); 
       } 
       return null; 
       } 
      } 
    ); 
    } 
Cuestiones relacionadas