2009-04-17 7 views
24

Al tener dos InputStreams en Java, ¿hay alguna manera de fusionarlos para que finalices con un InputStream que te da el resultado de ambas transmisiones? ¿Cómo?¿Cómo fusiona dos flujos de entrada en Java?

+3

Combinar de qué manera exactamente? Continuamente leyendo desde un segundo flujo después de que se haya leído el primero. No estoy muy familiarizado con Java, pero en C# puedes hacer esto fácilmente implementando una clase que hereda de Stream que contiene referencias a ambas transmisiones base y luego anula el método Read. – Noldorin

Respuesta

37

Como se comentó, no está claro lo que quiere decir con merge.

Tomar la entrada disponible "al azar" de cualquiera es complicado por InputStream.available no necesariamente dando una respuesta útil y bloqueando el comportamiento de las transmisiones. Necesitaría dos hilos para leer de las secuencias y luego devolver datos a través de, digamos, java.io.Piped(In|Out)putStream (aunque esas clases tienen problemas). Alternativamente, para algunos tipos de transmisión, puede ser posible utilizar una interfaz diferente, por ejemplo, java.nio canales sin bloqueo.

Si desea el contenido completo de la primera secuencia de entrada seguida de la segunda: new java.io.SequenceInputStream(s1, s2).

+2

Oh, muy bien, acabo de aprender algo nuevo. SequenceInputStream es esencialmente idéntico a mi CatInputStream, pero usa Enumeraciones heredadas en lugar de, digamos, una LinkedList. :-) –

+0

Como un hackeo a la primera parte de su respuesta, es difícil de resolver en el caso general, pero para casos específicos de FileInputStream (¿y tal vez también sockets?) Puede instanciaof/cast y crear un canal fuera de él. (Las otras transmisiones pueden usar Channels.newChannel para crear una interfaz consistente, pero no tendrán las cualidades de no bloqueo requeridas.) –

+0

Collections.enumeration es tu amigo. Olvidé una parte de mi primera parte, la editaré. –

0

No es que se me ocurra. Probablemente tendrías que leer los contenidos de las dos secuencias en un byte [] y luego crear un ByteArrayInputStream a partir de eso.

+0

Voto a favor de una solución simple, fácil de entender y práctica. Puede que no tenga el comportamiento requerido si el bloqueo es importante (o son enormes). –

4

Puede escribir una implementación personalizada InputStream que hace esto. Ejemplo:

import java.io.IOException; 
import java.io.InputStream; 
import java.util.Collections; 
import java.util.Deque; 
import java.util.LinkedList; 

public class CatInputStream extends InputStream { 
    private final Deque<InputStream> streams; 

    public CatInputStream(InputStream... streams) { 
     this.streams = new LinkedList<InputStream>(); 
     Collections.addAll(this.streams, streams); 
    } 

    private void nextStream() throws IOException { 
     streams.removeFirst().close(); 
    } 

    @Override 
    public int read() throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read()) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public int read(byte b[], int off, int len) throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read(b, off, len)) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public long skip(long n) throws IOException { 
     long skipped = 0L; 
     while (skipped < n && !streams.isEmpty()) { 
      int thisSkip = streams.getFirst().skip(n - skipped); 
      if (thisSkip > 0) 
       skipped += thisSkip; 
      else 
       nextStream(); 
     } 
     return skipped; 
    } 

    @Override 
    public int available() throws IOException { 
     return streams.isEmpty() ? 0 : streams.getFirst().available(); 
    } 

    @Override 
    public void close() throws IOException { 
     while (!streams.isEmpty()) 
      nextStream(); 
    } 
} 

Este código no está probado, por lo que su kilometraje puede variar.

+0

¿No hace esto lo mismo que SequenceInputStream, como sugiere Merzbow? –

+0

Lo sentimos, pero tackline sugirió primero SequenceInputStream (y I + 1 le hizo eso). En SO, la primera buena respuesta gana; nunca se sabe si las respuestas posteriores son plagios. Además, lea mi comentario en la respuesta de la tachuela para una comparación entre SequenceInputStream y CatInputStream (considero su punto sobre el uso de Collections.enumeration). –

+0

Si la persona que rechazó mi respuesta lo hizo debido a mi último comentario, me disculpo; Debería explicar mejor: si escribo una respuesta que resulta ser un duplicado (o subconjunto) de una respuesta publicada anteriormente, normalmente la elimino, sabiendo que nunca recibiré ningún punto por ella. En SO, realmente es "Pistola más rápida en el Oeste" (busca el título de la pregunta). –

14

java.io.SequenceInputStream podría ser lo que necesita. Acepta una enumeración de secuencias y generará el contenido de la primera transmisión, luego la segunda, y así sucesivamente hasta que todas las transmisiones estén vacías.

0

Aquí hay una implementación de MVar específica para matrices de bytes (asegúrese de agregar su propia definición de paquete). A partir de aquí, es trivial escribir una secuencia de entrada en las secuencias fusionadas. Puedo publicar eso también si así lo solicita.

import java.nio.ByteBuffer; 

public final class MVar { 

    private static enum State { 
    EMPTY, ONE, MANY 
    } 

    private final Object lock; 

    private State state; 

    private byte b; 

    private ByteBuffer bytes; 
    private int length; 

    public MVar() { 
    lock = new Object(); 
    state = State.EMPTY; 
    } 

    public final void put(byte b) { 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.b = b; 
     state = State.ONE; 
     lock.notifyAll(); 
    } 
    } 

    public final void put(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return; 
    } 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.bytes = ByteBuffer.allocateDirect(length); 
     this.bytes.put(bytes, offset, length); 
     this.bytes.position(0); 
     this.length = length; 
     state = State.MANY; 
     lock.notifyAll(); 
    } 
    } 

    public final byte take() { 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: { 
     state = State.EMPTY; 
     byte b = this.b; 
     lock.notifyAll(); 
     return b; 
     } 
     case MANY: { 
     byte b = bytes.get(); 
     state = --length <= 0 ? State.EMPTY : State.MANY; 
     lock.notifyAll(); 
     return b; 
     } 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 

    public final int take(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return 0; 
    } 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: 
     bytes[offset] = b; 
     state = State.EMPTY; 
     lock.notifyAll(); 
     return 1; 
     case MANY: 
     if (this.length > length) { 
      this.bytes.get(bytes, offset, length); 
      this.length = this.length - length; 
      synchronized (lock) { 
      lock.notifyAll(); 
      } 
      return length; 
     } 
     this.bytes.get(bytes, offset, this.length); 
     this.bytes = null; 
     state = State.EMPTY; 
     length = this.length; 
     lock.notifyAll(); 
     return length; 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 
} 
Cuestiones relacionadas