2010-01-15 12 views
25

Tenga en cuenta la siguiente secuencia de comandos shell:Estrategias para tuberías concurrentes en Java

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

Este tiene tres procesos que trabajan en paralelo para descomprimir una corriente, modificarlo, y volver a comprimirlo. Ejecutando time Veo que el tiempo de mi usuario es aproximadamente el doble que el de mi tiempo real, lo que indica que el programa funciona efectivamente en paralelo.

He intentado crear el mismo programa en Java colocando cada tarea en su propio hilo. Desafortunadamente, el programa Java multiproceso es solo aproximadamente 30% faster que la versión single threaded para la muestra anterior. Intenté usar un Exchanger y un ConcurrentLinkedQueue. La cola vinculada a ConcurrentLinkedQueue causa mucha controversia, aunque los tres hilos generalmente se mantienen ocupados. El Intercambiador tiene menor contención, pero es más complicado, y no parece mantener al trabajador más lento ejecutándose el 100% del tiempo.

Estoy tratando de encontrar una solución pura de Java para este problema sin mirar uno de los marcos de tejido de bytes o un MPI basado en JNI.

La mayor parte de la investigación concurrente y las API se refieren a sí mismos con los algoritmos divide-and-conquer, dando a cada trabajo del nodo que es ortogonal y no dependiente en los cálculos anteriores. Otro enfoque de concurrencia es el enfoque de canalización, donde cada trabajador hace un trabajo y pasa los datos al siguiente trabajador.

No estoy tratando de encontrar la forma más eficiente de extraer un archivo gzip'd, sino más bien estoy buscando cómo descomponer eficientemente las tareas en una tubería, a fin de reducir el tiempo de ejecución al de la la tarea más lenta.

tiempos actuales de un archivo de línea de 10 metros son los siguientes:

Testing via shell 

real 0m31.848s 
user 0m58.946s 
sys  0m1.694s 

Testing SerialTest 

real 0m59.997s 
user 0m59.263s 
sys  0m1.121s 

Testing ParallelExchangerTest 

real 0m41.573s 
user 1m3.436s 
sys  0m1.830s 

Testing ConcurrentQueueTest 

real 0m44.626s 
user 1m24.231s 
sys  0m10.856s 

Estoy ofreciendo una recompensa por una mejora del 10% en Java, medida por el tiempo real en un sistema de cuatro núcleos con filas de 10m datos de prueba. Las fuentes actuales están disponibles en Bitbucket.

+2

Un ejemplo de código de lo que está haciendo en Java sería muy útil para ofrecer mejoras. Es difícil ver lo que ha intentado –

+0

Siga los enlaces. – dmeister

+0

¿Está probando en una máquina con varias CPU? No estoy seguro si la JVM puede usar más de 1 CPU. –

Respuesta

6

He verificado el tiempo de forma individual, parece que la lectura lleva menos del 10% del tiempo, y la lectura y el procesamiento llevan menos del 30% de todo el tiempo. Así que tomé ParallelExchangerTest (mejor rendimiento en su código) y lo modifiqué a solo tengo 2 hilos, el primer hilo lee & reemplaza, y el segundo hilo hace la escritura.

Estas son las cifras para comparar (en mi máquina Intel de doble núcleo (no Core2) con Ubuntu con 1 GB de RAM)

> Pruebas mediante el intérprete de

0m41.601s reales

0m58.604s usuario

sys 0m1.032s

> Pruebas ParallelExchangerTest

1m55.424s

2m14.160s usuario

sys 0m4.768s

> ParallelExchangerTestMod (2 hilos)

1m35.524s reales reales

us er 1m55.319s

sys 0m3.580s

sabía que el procesamiento de cadenas tarda más tiempo por lo sustituyo line.repalce con matcher.replaceAll, tengo esta cifras

> ParallelExchangerTestMod_Regex (2 hilo)

1m12.781s reales

1m33.382s usuario

sys 0m2.916s

Ahora Di un paso por delante, en vez de leer una línea a la vez, leí char [] amortiguamiento de varios tamaños y cronometré, (con la expresión regular buscar/reemplazar,) llegué estas cifras

> Pruebas ParallelExchangerTestMod_Regex_Buff (procesamiento de 100 bytes a la vez)

1m13.804s reales

1m32.494s usuario

sys 0m2.676s

> Pruebas ParallelExchangerTestMod_Regex_Buff (procesamiento de 500 bytes en el tiempo)

1m6.286s reales

usuario 1m29.334s

sys 0m2.324s

> Pruebas ParallelExchangerTestMod_Regex_Buff (procesamiento de 800 bytes a la vez)

1m12.309s reales

1m33.910s usuario

sys 0m2.476s

se parece a 500 bytes es óptimo para el tamaño de los datos.

I en forma de horquilla y tener una copia de mis cambios aquí

https://bitbucket.org/chinmaya/java-concurrent_response/

+0

Revisé sus cambios y los ejecuté en una máquina Solaris. Los resultados fueron bastante diferentes de Ubuntu. El más rápido corrió 1,5 segundos más rápido que mi ParallelExchangerTest. ParallelExchangerTestMod_Regex verdadera 0m40.418s usuario 0m56.314s sys 0m1.374s Ejecución de la misma prueba en Ubuntu, Cygwin, y OS X muestra los resultados varían un poco de plataforma en plataforma. – brianegge

+0

ciertamente, la implementación de JVM cambia de una plataforma a otra. Es posible que desee probar la optimización en tiempo de compilación (javac -O) y runtime (java -X). – chinmaya

14

En primer lugar, el proceso solo será tan rápido como la pieza más lenta. Si la avería momento es:

  • gunzip: 1 segundos
  • SED: 5 segundos
  • gzip: 1 segundo

yendo multi-hilo se le hecho en en el mejor 5 segundos en lugar de 7.

En segundo lugar, en lugar de utilizar las colas que está utilizando, en su lugar intente replicar la funcionalidad de lo que está copiando y use PipedInputStream y PipedOutputStream para encadenar procesos juntos.

Editar: hay algunas maneras de manejar tareas relacionadas con utilidades de concurrencia de Java. Divídalo en hilos. En primer lugar, cree una clase base común:

public interface Worker { 
    public run(InputStream in, OutputStream out); 
} 

Lo que esta interfaz hace es representar un trabajo arbitrario que procesa la entrada y genera resultados. Encadena estos juntos y tienes una tubería. Usted puede abstraer el texto repetitivo también.Para ello necesitamos una clase:

public class UnitOfWork implements Runnable { 
    private final InputStream in; 
    private final OutputStream out; 
    private final Worker worker; 

    public UnitOfWork(InputStream in, OutputStream out, Worker worker) { 
    if (in == null) { 
     throw new NullPointerException("in is null"); 
    } 
    if (out == null) { 
     throw new NullPointerException("out is null"); 
    } 
    if (worker == null) { 
     throw new NullPointerException("worker is null"); 
    } 
    this.in = in; 
    this.out = out; 
    this.worker = worker; 
    } 

    public final void run() { 
    worker.run(in, out); 
    } 
} 

Así, por ejemplo, el Unzip PARTE:

public class Unzip implements Worker { 
    protected void run(InputStream in, OutputStream out) { 
    ... 
    } 
} 

y así sucesivamente para Sed y Zip. Lo que a continuación se une juntos es la siguiente:

public static void pipe(InputStream in, OutputStream out, Worker... workers) { 
    if (workers.length == 0) { 
    throw new IllegalArgumentException("no workers"); 
    } 
    OutputStream last = null; 
    List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length); 
    PipedOutputStream last = null; 
    for (int i=0; i<workers.length-2; i++) { 
    PipedOutputStream out = new PipedOutputStream(); 
    work.add(new UnitOfWork(
     last == null ? in, new PipedInputStream(last), out, workers[i]); 
    last = out; 
    } 
    work.add(new UnitOfWork(new PipedInputStream(last), 
    out, workers[workers.length-1); 
    ExecutorService exec = Executors.newFixedThreadPool(work.size()); 
    for (UnitOfWork w : work) { 
    exec.submit(w); 
    } 
    exec.shutdown(); 
    try { 
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
    } catch (InterruptedExxception e) { 
    // do whatever 
    } 
} 

No estoy seguro de que puede hacer mucho mejor que eso y no hay código mínimo de escribir para cada trabajo. Luego su código pasa a ser:

public static processFile(String inputName, String outputName) { 
    pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile), 
    new Zip(), new Sed(), new Unzip()); 
} 
+3

en algunas pruebas que realicé en el cifrado multiproceso Hice algo muy similar, pero el aumento real en el rendimiento se produjo cuando hice mi propia implementación de almacenamiento en búfer. las transmisiones canalizadas y las secuencias en búfer ya están almacenadas en el búfer, pero el tamaño del búfer introduce una gran cantidad de sobrecarga, a menos que los búferes estén alineados con lo que sea que el algoritmo de transformación sea. entonces, si zip funciona en 1k byte a tiempo, use un buffer personalizado de esa dimensión para alimentar los datos antes de la compresión, si sed usa una línea que recupera 128 líneas a la vez, este tipo de cosas aumentan enormemente la contención de reducción de velocidad y sobrecarga (y complejidad, otoh ...). –

3

También puede usar tuberías en Java. Se implementan como flujos, ver PipedInputStream y PipedOutputStream para más detalles.

Para evitar el bloqueo, yo recomendaría poner un tamaño de tubería adecuado.

+0

El PipedOutputStream y PipedInputStream son útiles cuando necesita conectar un flujo de salida a una entrada de vapor. La clase 'Sed' en la prueba de serie está haciendo efectivamente lo que las dos clases anteriores harían. No estoy tratando de lograr tuberías, sino más bien tuberías concurrentes. Las tuberías son como una línea de ensamblaje de automóviles, pasando el trabajo de una etapa a otra. Ejecutar cada etapa en su propio hilo ofrece la posibilidad de concurrencia en tareas que no se pueden ejecutar en paralelo. – brianegge

+0

Además; Acabo de probar el rendimiento de PidedIOStream, y en realidad es solo un semáforo, y solo un hilo a la vez puede funcionar en el búfer subyacente. Sin embargo, creo que sería posible volver a escribir las clases para usar más de un búfer y aumentar el rendimiento. – KarlP

3

Teniendo en cuenta que usted no está diciendo la forma en que se está midiendo el tiempo transcurrido, estoy asumiendo que usted está usando algo como:

time java org.egge.concurrent.SerialTest <in.gz> out.gz 
time java org.egge.concurrent.ConcurrentQueueTest <in.gz> out.gz 

El problema con esto es que se está midiendo dos cosas aquí:

  1. Cuánto tarda la JVM en iniciarse y
  2. Cuánto tarda el programa en ejecutarse.

Solo puede cambiar la segunda con los cambios de código. Utilizando las cifras que le dio:

Testing SerialTest 
real 0m6.736s 
user 0m6.924s 
sys  0m0.245s 

Testing ParallelExchangerTest 
real 0m4.967s 
user 0m7.491s 
sys  0m0.850s 

Si asumimos que el inicio de la JVM toma tres segundos, entonces el "tiempo del programa de gestión" es de 3,7 y 1,9 segundos respectivamente, esto es más o menos un aumento de velocidad 100%. Le sugiero encarecidamente que utilice un conjunto de datos más grande para probar, de modo que pueda minimizar el impacto del inicio de la JVM en los resultados de tiempo.

Editar: Según sus respuestas a esta pregunta, es posible que esté sufriendo un conflicto de bloqueo. La mejor manera de resolver eso en Java es usar los lectores y escritores de texto, leer de los tubos, byte a la vez, y reemplazar cualquier '@' caracteres en el flujo de entrada con un "_at_" en el flujo de salida. Es posible que sufras por el hecho de que cada cuerda se escanea tres veces, y cualquier reemplazo requiere la construcción de un nuevo objeto, y la cuerda termina siendo copiada nuevamente. Espero que esto ayude ...

+0

Si ejecuto la prueba con un solo registro, y puedo ver tiempos reales de 0m0.292s. La prueba no es ideal ya que la última etapa es más del doble de intensa que las dos primeras. – brianegge

+0

El arnés de prueba que estoy usando está en http://bitbucket.org/brianegge/java-concurrent/src/tip/bin/test.Al ejecutar 10m filas, el ParallelExchanger muestra un tiempo de 'usuario' casi el mismo que el del shell, pero un tiempo real que es 10 segundos más largo. Si puedo mejorar la eficiencia, puede ejecutarse al mismo tiempo que el script de shell. – brianegge

0

reducción del número de lecturas y objetos que me dan más de un 10% más de rendimiento.

Pero el rendimiento de java.util.concurrent sigue siendo un poco decepcionante.

ConcurrentQueueTest:

private static class Reader implements Runnable { 

@Override 
    public void run() { 
    final char buf[] = new char[8192]; 
    try { 

    int len; 
    while ((len = reader.read(buf)) != -1) { 
    pipe.put(new String(buf,0,len)); 
    } 
    pipe.put(POISON); 

    } catch (IOException e) { 
    throw new RuntimeException(e); 
    } catch (InterruptedException e) { 
    throw new RuntimeException(e); 
    } 
    } 
Cuestiones relacionadas