2011-02-06 7 views
61

Acabo de encontrar CompletionService en this blog post. Sin embargo, esto realmente no muestra las ventajas de CompletionService sobre un ExecutorService estándar. El mismo código se puede escribir con cualquiera. Entonces, ¿cuándo es útil un CompletionService?¿Cuándo debería usar un CompletionService en un ExecutorService?

¿Se puede dar una muestra de código corto para que sea claro como el cristal? Por ejemplo, este ejemplo de código demuestra que no se necesite un CompletionService (= equivalente a ExecutorService)

ExecutorService taskExecutor = Executors.newCachedThreadPool(); 
    //  CompletionService<Long> taskCompletionService = 
    //    new ExecutorCompletionService<Long>(taskExecutor); 
    Callable<Long> callable = new Callable<Long>() { 
     @Override 
     public Long call() throws Exception { 
      return 1L; 
     } 
    }; 

    Future<Long> future = // taskCompletionService.submit(callable); 
     taskExecutor.submit(callable); 

    while (!future.isDone()) { 
     // Do some work... 
     System.out.println("Working on something..."); 
    } 
    try { 
     System.out.println(future.get()); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } catch (ExecutionException e) { 
     e.printStackTrace(); 
    } 

Respuesta

78

Con ExecutorService, una vez que haya presentado las tareas de ejecutar, necesita codificar manualmente para obtener de manera eficiente los resultados de las tareas completadas. Con CompletionService, esto está prácticamente automatizado. La diferencia no es muy evidente en el código que ha presentado porque está enviando solo una tarea. Sin embargo, imagine que tiene una lista de tareas para enviar. En el siguiente ejemplo, se envían múltiples tareas al CompletionService. Luego, en lugar de tratar de averiguar qué tarea se ha completado (para obtener los resultados), solo le pide a la instancia de CompletionService que devuelva los resuts a medida que estén disponibles.

public class CompletionServiceTest { 

     class CalcResult { 
      long result ; 

      CalcResult(long l){ 
       result = l; 
      } 
     } 

     class CallableTask implements Callable<CalcResult> { 
      String taskName ; 
      long input1 ; 
      int input2 ; 

      CallableTask(String name , long v1 , int v2){ 
       taskName = name; 
       input1 = v1; 
       input2 = v2 ; 
      } 

      public CalcResult call() throws Exception { 
       System.out.println(" Task " + taskName + " Started -----"); 
       for(int i=0;i<input2 ;i++){ 
        try { 
         Thread.sleep(200); 
        } catch (InterruptedException e) { 
         System.out.println(" Task " + taskName + " Interrupted !! "); 
         e.printStackTrace(); 
        } 
        input1 += i; 
       } 
       System.out.println(" Task " + taskName + " Completed @@@@@@"); 
       return new CalcResult(input1) ; 
      } 

     } 

     public void test(){ 
      ExecutorService taskExecutor = Executors.newFixedThreadPool(3); 
       CompletionService<CalcResult> taskCompletionService = 
        new ExecutorCompletionService<CalcResult>(taskExecutor); 

      int submittedTasks = 5; 
      for(int i=0;i< submittedTasks;i++){ 
       taskCompletionService.submit(new CallableTask(
         String.valueOf(i), 
         (i * 10), 
         ((i * 10) + 10 ) 
         )); 
       System.out.println("Task " + String.valueOf(i) + "subitted"); 
      } 
      for(int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++){ 
       try { 
        System.out.println("trying to take from Completion service"); 
        Future<CalcResult> result = taskCompletionService.take(); 
        System.out.println("result for a task availble in queue.Trying to get()" ); 
        // above call blocks till atleast one task is completed and results availble for it 
        // but we dont have to worry which one 

        // process the result here by doing result.get() 
        CalcResult l = result.get(); 
        System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result)); 

       } catch (InterruptedException e) { 
        // Something went wrong with a task submitted 
        System.out.println("Error Interrupted exception"); 
        e.printStackTrace(); 
       } catch (ExecutionException e) { 
        // Something went wrong with the result 
        e.printStackTrace(); 
        System.out.println("Error get() threw exception"); 
       } 
      } 
     } 
    } 
+5

para otro ejemplo vea Java Concurrency in Practice pág. 130. Allí se usa un CompletionService para renderizar imágenes a medida que estén disponibles. – Pete

+0

¿Es seguro asumir que 'take' y' poll' en CompletionService son seguros para subprocesos? En su ejemplo, las tareas todavía se están ejecutando cuando se invoca 'take()' por primera vez, y no veo ninguna sincronización explícita. – raffian

+0

'take()' es seguro para hilos. Puedes leer en los JavaDocs pero básicamente 'take()' esperará el siguiente resultado completado y luego lo devolverá. El 'CompletionService' funciona con' BlockingQueue' para la salida. –

10

creo que el javadoc mejor responde a la pregunta de cuándo el CompletionService es útil en una forma en que un ExecutorService no lo es.

Un servicio que desacopla la producción de nuevas tareas asíncronas del consumo de los resultados de las tareas completadas.

Básicamente, esta interfaz permite que un programa tenga productores que creen y presenten tareas (e incluso examinen los resultados de esas presentaciones) sin conocer ningún otro consumidor de los resultados de esas tareas. Mientras tanto, los consumidores que conocen el CompletionService pueden obtener poll o take resultados sin tener conocimiento de los productores que envían las tareas.

Para el registro, y podría estar equivocado porque es bastante tarde, pero estoy bastante seguro de que el código de muestra en esa publicación de blog provoca una pérdida de memoria. Sin un consumidor activo que saque resultados de la cola interna de ExecutorCompletionService, no estoy seguro de cómo el blogger esperaba que la cola se agotara.

7

Básicamente, usa un CompletionService si desea ejecutar múltiples tareas en paralelo y luego trabajar con ellas en su orden de finalización. Entonces, si ejecuto 5 trabajos, el CompletionService me dará el primero que termine. El ejemplo en el que solo hay una tarea no confiere ningún valor adicional en un Executor aparte de la capacidad de enviar un Callable.

120

Omitir muchos detalles:

  • ExecutorService = entrantes de cola + subprocesos de trabajo de cola
  • CompletionService = hilos de cola + trabajadores de entrada + salida
4

En primer lugar, si no queremos perder el tiempo de procesador, no vamos a utilizar

while (!future.isDone()) { 
     // Do some work... 
} 

Debemos usar

service.shutdown(); 
service.awaitTermination(14, TimeUnit.DAYS); 

Lo malo de este código es que se cerrará ExecutorService. Si queremos continuar trabajando con él (es decir, tenemos alguna creación de tareas recursivas), tenemos dos alternativas: invokeAll o ExecutorService.

invokeAll esperará hasta que se completen todas las tareas. ExecutorService nos otorga la capacidad de tomar o sondear los resultados uno por uno.

Y, finily, ejemplo recursivo:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER); 
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); 

while (Tasks.size() > 0) { 
    for (final Task task : Tasks) { 
     completionService.submit(new Callable<String>() { 
      @Override 
      public String call() throws Exception { 
       return DoTask(task); 
      } 
     }); 
    } 

    try {     
     int taskNum = Tasks.size(); 
     Tasks.clear(); 
     for (int i = 0; i < taskNum; ++i) { 
      Result result = completionService.take().get(); 
      if (result != null) 
       Tasks.add(result.toTask()); 
     }   
    } catch (InterruptedException e) { 
    // error :(
    } catch (ExecutionException e) { 
    // error :(
    } 
} 
1

Digamos que tiene 5 tareas de ejecución larga (tarea invocable) y ha enviado esas tareas al servicio de ejecución. Ahora imagina que no quieres esperar a que compitan las 5 tareas, sino que quieres hacer algún tipo de procesamiento en estas tareas si las completas. Ahora esto se puede hacer escribiendo lógica de sondeo en objetos futuros o usando esta API.

Cuestiones relacionadas