2009-10-30 10 views
15

Estoy intentando descubrir cómo usar correctamente los ejecutores de Java. Me doy cuenta de que enviar tareas a un ExecutorService tiene su propia sobrecarga. Sin embargo, me sorprende ver que es tan alto como lo es.El sorprendente punto de equilibrio de rendimiento del ExecutorService --- ¿reglas de oro?

Mi programa necesita procesar gran cantidad de datos (datos bursátiles) con la latencia más baja posible. La mayoría de los cálculos son operaciones aritméticas bastante simples.

Me trataron de probar algo muy simple: "Math.random() * Math.random()"

La prueba más simple se ejecuta este cálculo en un bucle simple. La segunda prueba realiza el mismo cálculo dentro de Runnable anónimo (se supone que esto mide el costo de crear objetos nuevos). La tercera prueba pasa el Runnable a ExecutorService (esto mide el costo de introducir ejecutores).

me corrieron las pruebas en mi portátil pequeño (2 CPUs, 1,5 concierto RAM):

(in milliseconds) 
simpleCompuation:47 
computationWithObjCreation:62 
computationWithObjCreationAndExecutors:422 

(aproximadamente una vez cada cuatro carreras, los dos primeros números terminan siendo iguales)

Aviso que los ejecutores toman mucho, mucho más tiempo que ejecutando en un solo hilo. Los números eran aproximadamente los mismos para los tamaños de grupo de subprocesos entre 1 y 8.

Pregunta: ¿Me falta algo obvio o se esperan estos resultados? Estos resultados me dicen que cualquier tarea que paso a un ejecutor debe hacer un cálculo no trivial. Si estoy procesando millones de mensajes y necesito realizar transformaciones muy simples (y baratas) en cada mensaje, aún no puedo usar ejecutores ... tratar de distribuir cálculos entre múltiples CPU podría terminar siendo más costoso que solo haciéndolos en un solo hilo. La decisión de diseño se vuelve mucho más compleja de lo que originalmente pensé. ¿Alguna idea?


import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ExecServicePerformance { 

private static int count = 100000; 

public static void main(String[] args) throws InterruptedException { 

    //warmup 
    simpleCompuation(); 
    computationWithObjCreation(); 
    computationWithObjCreationAndExecutors(); 

    long start = System.currentTimeMillis(); 
    simpleCompuation(); 
    long stop = System.currentTimeMillis(); 
    System.out.println("simpleCompuation:"+(stop-start)); 

    start = System.currentTimeMillis(); 
    computationWithObjCreation(); 
    stop = System.currentTimeMillis(); 
    System.out.println("computationWithObjCreation:"+(stop-start)); 

    start = System.currentTimeMillis(); 
    computationWithObjCreationAndExecutors(); 
    stop = System.currentTimeMillis(); 
    System.out.println("computationWithObjCreationAndExecutors:"+(stop-start)); 


} 

private static void computationWithObjCreation() { 
    for(int i=0;i<count;i++){ 
    new Runnable(){ 

    @Override 
    public void run() { 
    double x = Math.random()*Math.random(); 
    } 

    }.run(); 
    } 

} 

private static void simpleCompuation() { 
    for(int i=0;i<count;i++){ 
    double x = Math.random()*Math.random(); 
    } 

} 

private static void computationWithObjCreationAndExecutors() 
    throws InterruptedException { 

    ExecutorService es = Executors.newFixedThreadPool(1); 
    for(int i=0;i<count;i++){ 
    es.submit(new Runnable() { 
    @Override 
    public void run() { 
    double x = Math.random()*Math.random();  
    } 
    }); 
    } 
    es.shutdown(); 
    es.awaitTermination(10, TimeUnit.SECONDS); 
} 
} 
+0

Wow, la vista previa formateó el código mucho mejor que el resultado final. ¿Cómo puedo arreglar esto? – Shahbaz

+1

Acabo de reformatearlo, ¿se ve mejor? –

+0

Gracias ZZ Coder, el código ahora parece que debería – Shahbaz

Respuesta

19
  1. Uso ejecutores es en la utilización de CPU y/o núcleos de CPU, por lo que si se crea un grupo de subprocesos que utiliza la cantidad de CPU en el mejor, usted tiene que tener tantos procesos como CPU/núcleos.
  2. Tiene razón, crear nuevos objetos cuesta demasiado. Entonces, una forma de reducir los gastos es usar lotes. Si conoce el tipo y la cantidad de cálculos que debe hacer, crea lotes. Por lo tanto, piense en mil (s) cálculos realizados en una tarea ejecutada. Usted crea lotes para cada hilo. Tan pronto como finaliza el cálculo (java.util.concurrent.Future), crea el siguiente lote. Incluso la creación de nuevos lotes se puede hacer en parralel (4 CPUs -> 3 hilos para computación, 1 hilo para aprovisionamiento por lotes). Al final, puede terminar con un mayor rendimiento, pero con mayores demandas de memoria (lotes, aprovisionamiento).

Editar: Cambié tu ejemplo y lo dejé funcionar en mi pequeña computadora portátil dual-core x200.

provisioned 2 batches to be executed 
simpleCompuation:14 
computationWithObjCreation:17 
computationWithObjCreationAndExecutors:9 

Como se puede ver en el código fuente, que tomó el ciclo de vida de la transferencia por lotes y ejecutor de la medida, también. Eso es más justo en comparación con los otros dos métodos.

Ver los resultados por sí mismo ...

import java.util.List; 
import java.util.Vector; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ExecServicePerformance { 

    private static int count = 100000; 

    public static void main(String[] args) throws InterruptedException { 

     final int cpus = Runtime.getRuntime().availableProcessors(); 

     final ExecutorService es = Executors.newFixedThreadPool(cpus); 

     final Vector<Batch> batches = new Vector<Batch>(cpus); 

     final int batchComputations = count/cpus; 

     for (int i = 0; i < cpus; i++) { 
      batches.add(new Batch(batchComputations)); 
     } 

     System.out.println("provisioned " + cpus + " batches to be executed"); 

     // warmup 
     simpleCompuation(); 
     computationWithObjCreation(); 
     computationWithObjCreationAndExecutors(es, batches); 

     long start = System.currentTimeMillis(); 
     simpleCompuation(); 
     long stop = System.currentTimeMillis(); 
     System.out.println("simpleCompuation:" + (stop - start)); 

     start = System.currentTimeMillis(); 
     computationWithObjCreation(); 
     stop = System.currentTimeMillis(); 
     System.out.println("computationWithObjCreation:" + (stop - start)); 

     // Executor 

     start = System.currentTimeMillis(); 
     computationWithObjCreationAndExecutors(es, batches);  
     es.shutdown(); 
     es.awaitTermination(10, TimeUnit.SECONDS); 
     // Note: Executor#shutdown() and Executor#awaitTermination() requires 
     // some extra time. But the result should still be clear. 
     stop = System.currentTimeMillis(); 
     System.out.println("computationWithObjCreationAndExecutors:" 
       + (stop - start)); 
    } 

    private static void computationWithObjCreation() { 

     for (int i = 0; i < count; i++) { 
      new Runnable() { 

       @Override 
       public void run() { 

        double x = Math.random() * Math.random(); 
       } 

      }.run(); 
     } 

    } 

    private static void simpleCompuation() { 

     for (int i = 0; i < count; i++) { 
      double x = Math.random() * Math.random(); 
     } 

    } 

    private static void computationWithObjCreationAndExecutors(
      ExecutorService es, List<Batch> batches) 
      throws InterruptedException { 

     for (Batch batch : batches) { 
      es.submit(batch); 
     } 

    } 

    private static class Batch implements Runnable { 

     private final int computations; 

     public Batch(final int computations) { 

      this.computations = computations; 
     } 

     @Override 
     public void run() { 

      int countdown = computations; 
      while (countdown-- > -1) { 
       double x = Math.random() * Math.random(); 
      } 
     } 
    } 
} 
+0

Solución interesante. Me da algunas ideas sobre cómo cambiar mi uso de ejecutores. – Shahbaz

+0

+1, muy buen ejemplo. –

+0

hola, si me quedo este ejemplo en un doble núcleo MacOSX, llegué: simpleComputation: 268 computationWithObjCreation: 155 computation2: 0, porque el resultado de computationWithObjCreationAndExecutors no se recupera? Si moví el es.shutdown() y es.awaitTermination antes de que nos tomamos el tiempo de parada, entonces el resultado: aprovisionan: 2 lotes que se ejecutarán simpleComputation: 261 computationWithObjCreation: 92 computationWithObjCreationAndExecutors: 126 donde computationWithObjCreationAndExecutors tiene un rendimiento peor que computationWithObjCreation. ¿Por qué está pasando esto? – portoalet

6

Esto no es una prueba justa para el grupo de subprocesos por razones siguientes,

  1. usted no está tomando ventaja de la puesta en común en absoluto, ya que sólo tiene 1 hilo.
  2. El trabajo es demasiado simple como para que la sobrecarga de agrupación no se pueda justificar. Una multiplicación en una CPU con FPP solo lleva unos pocos ciclos.

Teniendo en cuenta los siguientes pasos adicionales del grupo de subprocesos tiene que hacer, además de la creación de objetos y ejecutar el trabajo,

  1. Coloque el trabajo en la cola
  2. quitar el trabajo de la cola
  3. Obtener el hilo del grupo y ejecutar el trabajo
  4. Devolver el hilo al grupo

Cuando tiene un trabajo real y varios subprocesos, el beneficio del grupo de subprocesos será evidente.

+1

I segundo ZZ Coder; en mi experiencia, los beneficios serán más evidentes cuando el grupo de subprocesos sea más grande. – Everyone

+0

El ejecutor no tiene que "obtener" y "devolver" un hilo. Crea un hilo de trabajo interno que sondea() la cola de tareas. Además, dada la baja complejidad del tiempo de la tarea, probablemente sea una ventaja usar solo un hilo; de lo contrario, existe la posibilidad de que el bloqueo en el BlockingQueue se contenga y cause problemas al mover los subprocesos de trabajo dentro y fuera del Estado ejecutable. ¿Costo real? Ir al kernel para crear un hilo y también llamar a una operación de bloqueo mientras espera que el hilo termine. 100,000 no es mucho. Pero la lección aprendida, el ajuste del rendimiento requiere pruebas. –

+0

Probé tamaños de grupos de subprocesos entre 1 y 8, todos devolvieron los mismos números.Me concentré en el tamaño de grupo de 1 porque quería medir la sobrecarga del marco ejecutor. Su comentario refuerza que necesito estudiar más a fondo las partes internas del marco. – Shahbaz

0

En primer lugar, hay algunos problemas con el microbenchmark. Haces un calentamiento, que es bueno.Sin embargo, es mejor ejecutar la prueba varias veces, lo que debería dar una idea de si realmente se ha calentado y la varianza de los resultados. También tiende a ser mejor hacer la prueba de cada algoritmo en corridas separadas, de lo contrario, puede causar desoptimización cuando cambia un algoritmo.

La tarea es muy pequeña, aunque no estoy del todo seguro de cuán pequeña. Entonces, el número de veces más rápido no tiene sentido. En situaciones multiproceso, tocará las mismas ubicaciones volátiles para que los hilos puedan causar un rendimiento realmente malo (use una instancia Random por hilo). Además, una ejecución de 47 milisegundos es un poco corta.

Ciertamente, ir a otro hilo para una pequeña operación no va a ser rápido. Dividir las tareas en tamaños más grandes si es posible. JDK7 parece tener un marco fork-join, que intenta admitir tareas precisas de los algoritmos de división y conquista al preferir ejecutar tareas en el mismo subproceso en orden, con tareas más grandes extraídas por subprocesos inactivos.

+0

Buen punto acerca de ejecutar la prueba varias veces. De hecho lo ejecuté muchas veces, solo pegué un solo resultado. Entiendo tu punto para mejorar el punto de referencia. – Shahbaz

4

No creo que esto es en absoluto realista ya que está creando un nuevo servicio ejecutor cada vez que realiza la llamada al método. A menos que tenga requisitos muy extraños que parezcan poco realistas, normalmente crearía el servicio cuando se inicia su aplicación y luego le enviaría trabajos.

Si prueba la evaluación comparativa de nuevo pero inicialice el servicio como campo, una vez, fuera del ciclo de temporización; luego verás la sobrecarga real de enviar ejecutables al servicio frente a ejecutarlos tú mismo.

Pero no creo que hayas comprendido bien el punto: los ejecutores no están destinados a la eficiencia, están ahí para simplificar la coordinación y la distribución del trabajo a un grupo de subprocesos. Siempre serán menos eficientes que solo invocar el Runnable.run() (ya que al final del día, el servicio del ejecutor aún necesita hacer esto, después de haber hecho un mantenimiento extra de antemano). Es cuando los está usando desde múltiples hilos que necesitan procesamiento asíncrono, que realmente brillan.

También considere que está viendo la diferencia de tiempo relativa de un costo básicamente fijo (la sobrecarga del ejecutor es la misma si sus tareas tardan 1ms o 1hr en ejecutarse) en comparación con una cantidad variable muy pequeña (su ejecución trivial).Si el servicio del ejecutor toma 5 ms adicionales para ejecutar una tarea de 1 ms, esa no es una cifra muy favorable. Si se necesitan 5 ms adicionales para ejecutar una tarea de 5 segundos (por ejemplo, una consulta SQL no trivial), eso es completamente insignificante y merece la pena.

Por lo tanto, hasta cierto punto depende de su situación: si tiene una sección extremadamente crítica, que ejecuta muchas tareas pequeñas, que no necesitan ejecutarse en paralelo o asíncronamente, no obtendrá nada de una Ejecutor. Si está procesando tareas más pesadas en paralelo y desea responder de forma asincrónica (por ejemplo, una aplicación web), los ejecutores son geniales.

Si son la mejor opción para usted, depende de su situación, pero realmente debe probar las pruebas con datos representativos realistas. No creo que sea apropiado extraer conclusiones de las pruebas que has realizado a menos que tus tareas realmente sean tan triviales (y no quieras reutilizar la instancia del ejecutor ...).

+0

Inicializo el ejecutor dentro de un método, pero no dentro del ciclo. Usé métodos simplemente para mantener las pruebas separadas. Sé que los ejecutores tienen sus gastos generales, me sorprendió que fuera tan alto. Desafortunadamente (o afortunadamente), la mayoría de mis cálculos son realmente triviales (aritmética simple), excepto que se hacen en muchos mensajes. Piensa en un sistema de mensajería que maneje una avalancha de mensajes, pero la transformación de cada mensaje no es demasiado costosa. Lo que obtengo de esto es que necesito hacer que mi programa sea concurrente con una granularidad diferente de la que estaba originalmente pensando. – Shahbaz

0

El último objetivo de Fixed ThreadPool es reutilizar los hilos ya creados. Por lo tanto, los aumentos de rendimiento se ven en la falta de la necesidad de recrear un nuevo hilo cada vez que se envía una tarea. Por lo tanto, el tiempo de parada debe tomarse dentro de la tarea enviada. Solo con en la última declaración del método de ejecución.

2

Math.random() realmente se sincroniza en un solo generador de números aleatorios. Llamar a Math.random() da como resultado argumento significativo para el generador de números. De hecho, cuantos más hilos tenga, más lento será.

Desde el javadoc Math.random():

Este método se sincroniza correctamente para permitir el uso correcto por más de un hilo. Sin embargo, si muchos subprocesos necesitan generar números pseudoaleatorios a gran velocidad, puede reducir la contención para cada subproceso a tener su propio generador de números pseudoaleatorios.

0

Debe agrupar de alguna manera la ejecución, a fin de enviar porciones más grandes de computación a cada subproceso (por ejemplo, grupos de compilación basados ​​en el símbolo de stock). Obtuve los mejores resultados en escenarios similares al usar el Disruptor. Tiene una sobrecarga por trabajo muy baja. Aún así, es importante para trabajos en grupo, ingenuo round robin por lo general crea muchas fallas en el caché.

ver http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.html

2

La 'sobrecarga' que mencionas es nada que ver con ExecutorService, que es causada por múltiples hilos Sincronización en la Math.random, la creación de contención de bloqueo.

Así que sí, le falta algo (y la respuesta 'correcta' a continuación no es realmente correcta).

Aquí hay un código Java 8 para demostrar 8 hilos de ejecución de una función simple en el que no hay contención de bloqueo:

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 
import java.util.function.DoubleFunction; 

import com.google.common.base.Stopwatch; 

public class ExecServicePerformance { 

    private static final int repetitions = 120; 
    private static int totalOperations = 250000; 
    private static final int cpus = 8; 
    private static final List<Batch> batches = batches(cpus); 

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000/Math.PI); }; 

    public static void main(String[] args) throws InterruptedException { 

     printExecutionTime("Synchronous", ExecServicePerformance::synchronous); 
     printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches); 
     printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches); 
     printExecutionTime("Executor pool", ExecServicePerformance::executorPool); 

    } 

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException { 
     long time = 0; 
     for (int i = 0; i < repetitions; i++) { 
      Stopwatch stopwatch = Stopwatch.createStarted(); 
      f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread 
      time += stopwatch.elapsed(TimeUnit.MILLISECONDS); 
     } 
     System.out.println(msg + " exec time: " + time); 
    }  

    private static void synchronous() { 
     for (int i = 0; i < totalOperations; i++) { 
      performanceFunc.apply(i); 
     } 
    } 

    private static void synchronousBatches() {  
     for (Batch batch : batches) { 
      batch.synchronously(); 
     } 
    } 

    private static void asynchronousBatches() { 

     CountDownLatch cb = new CountDownLatch(cpus); 

     for (Batch batch : batches) { 
      Runnable r =() -> { batch.synchronously(); cb.countDown(); }; 
      Thread t = new Thread(r); 
      t.start(); 
     } 

     try { 
      cb.await(); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     }   
    } 

    private static void executorPool() { 

     final ExecutorService es = Executors.newFixedThreadPool(cpus); 

     for (Batch batch : batches) { 
      Runnable r =() -> { batch.synchronously(); }; 
      es.submit(r); 
     } 

     es.shutdown(); 

     try { 
      es.awaitTermination(10, TimeUnit.SECONDS); 
     } catch (InterruptedException e) { 
      throw new RuntimeException(e); 
     } 

    } 

    private static List<Batch> batches(final int cpus) { 
     List<Batch> list = new ArrayList<Batch>(); 
     for (int i = 0; i < cpus; i++) { 
      list.add(new Batch(totalOperations/cpus)); 
     } 
     System.out.println("Batches: " + list.size()); 
     return list; 
    } 

    private static class Batch { 

     private final int operationsInBatch; 

     public Batch(final int ops) { 
      this.operationsInBatch = ops; 
     } 

     public void synchronously() { 
      for (int i = 0; i < operationsInBatch; i++) { 
       performanceFunc.apply(i); 
      } 
     } 
    } 


} 

temporizaciones de Resultados para 120 pruebas de 25k operaciones (ms):

  • síncrono tiempo Exec: 9956
  • lotes sincrónicos tiempo Exec: 9900
  • subproceso por lotes tiempo Exec: 2176
  • Executor pool exec time: 1922

Ganador: Executor Service.

1

Aquí hay resultados en mi máquina (OpenJDK 8 de 64 bits de Ubuntu 14.0, Thinkpad W530)

simpleCompuation:6 
computationWithObjCreation:5 
computationWithObjCreationAndExecutors:33 

Ciertamente hay sobrecarga. Pero recuerde cuáles son estos números: milisegundos para 100k iteraciones. En su caso, la sobrecarga fue de aproximadamente 4 microsegundos por iteración. Para mí, la sobrecarga fue de aproximadamente un cuarto de microsegundo.

La sobrecarga es la sincronización, las estructuras de datos internas, y posiblemente la falta de optimización JIT debido a rutas de código complejas (sin duda más complejo que su bucle for).

Las tareas que realmente querría paralelizar valdrían la pena, a pesar de la sobrecarga de un cuarto de segundo.


FYI, esto sería un muy mal cálculo para paralelizar. Aumenté el hilo a 8 (el número de núcleos):

simpleCompuation:5 
computationWithObjCreation:6 
computationWithObjCreationAndExecutors:38 

No lo hizo más rápido. Esto se debe a que Math.random() está sincronizado.

Cuestiones relacionadas