He escrito las implementaciones de fondo InputStream
(y OutputStream
) que envuelven otras transmisiones y leen adelante en una cadena de fondo, permitiendo principalmente la descompresión/compresión en diferentes subprocesos del procesamiento de la secuencia descomprimida.Implementaciones de InputStream en segundo plano almacenadas
Es un modelo de productor/consumidor bastante estándar.
Esto parece una manera fácil de hacer un buen uso de las CPU multi-core con procesos simples que leen, procesan y escriben datos, lo que permite un uso más eficiente de los recursos de la CPU y del disco. Tal vez 'eficiente' no es la mejor palabra, pero proporciona una mayor utilización, y de más interés para mí, tiempos de ejecución reducidos, en comparación con la lectura directamente de ZipInputStream
y escribir directamente en ZipOutputStream
.
Me complace publicar el código, pero mi pregunta es si estoy reinventando algo disponible en las bibliotecas existentes (y más intensas)?
Editar - código de publicación ...
Mi código para el BackgroundInputStream
es a continuación (el BackgroundOutputStream
es muy similar), pero hay aspectos que me gustaría mejorar.
- Parece que estoy trabajando demasiado duro para pasar los búferes hacia atrás y hacia adelante.
- Si el código de llamada arroja referencias al
BackgroundInputStream
, elbackgroundReaderThread
se mantendrá indefinidamente. - Signaling
eof
necesita mejorar. - Las excepciones se deben propagar al subproceso en primer plano.
- Me gustaría permitir el uso de un hilo de un proporcionado
Executor
. - El método
close()
debe señalar el hilo de fondo y no debe cerrar el flujo envuelto, ya que el flujo envuelto debe ser propiedad del hilo de fondo que lee de él. - Hacer cosas tontas como leer después del cierre debe atenderse adecuadamente.
package nz.co.datacute.io;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
public class BackgroundInputStream extends InputStream {
private static final int DEFAULT_QUEUE_SIZE = 1;
private static final int DEFAULT_BUFFER_SIZE = 64*1024;
private final int queueSize;
private final int bufferSize;
private volatile boolean eof = false;
private LinkedBlockingQueue<byte[]> bufferQueue;
private final InputStream wrappedInputStream;
private byte[] currentBuffer;
private volatile byte[] freeBuffer;
private int pos;
public BackgroundInputStream(InputStream wrappedInputStream) {
this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
}
public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
this.wrappedInputStream = wrappedInputStream;
this.queueSize = queueSize;
this.bufferSize = bufferSize;
}
@Override
public int read() throws IOException {
if (bufferQueue == null) {
bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
BackgroundReader backgroundReader = new BackgroundReader();
Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
backgroundReaderThread.start();
}
if (currentBuffer == null) {
try {
if ((!eof) || (bufferQueue.size() > 0)) {
currentBuffer = bufferQueue.take();
pos = 0;
} else {
return -1;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int b = currentBuffer[pos++];
if (pos == currentBuffer.length) {
freeBuffer = currentBuffer;
currentBuffer = null;
}
return b;
}
@Override
public int available() throws IOException {
if (currentBuffer == null) return 0;
return currentBuffer.length;
}
@Override
public void close() throws IOException {
wrappedInputStream.close();
currentBuffer = null;
freeBuffer = null;
}
class BackgroundReader implements Runnable {
@Override
public void run() {
try {
while (!eof) {
byte[] newBuffer;
if (freeBuffer != null) {
newBuffer = freeBuffer;
freeBuffer = null;
} else {
newBuffer = new byte[bufferSize];
}
int bytesRead = 0;
int writtenToBuffer = 0;
while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
writtenToBuffer += bytesRead;
}
if (writtenToBuffer > 0) {
if (writtenToBuffer < bufferSize) {
newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
}
bufferQueue.put(newBuffer);
}
if (bytesRead == -1) {
eof = true;
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
¿Has encontrado tu respuesta? ¿Está disponible en cualquiera de las bibliotecas existentes? – adi
@adi Creo que la simplicidad de las soluciones de un solo subproceso significa que es más seguro utilizar múltiples núcleos procesando varios archivos a la vez, cada uno en un único subproceso, en lugar de utilizar varios subprocesos para acelerar el procesamiento de un único archivo.Si solo tiene que procesar un archivo, probablemente deba esperar el tiempo adicional requerido para procesarlo en un solo hilo. –