2009-08-24 13 views
14

Necesito enviar una cantidad de tareas y luego esperar hasta que todos los resultados estén disponibles. Cada uno de ellos agrega un String a un Vector (que está sincronizado de manera predeterminada). Luego, debo comenzar una nueva tarea para cada resultado en el Vector, pero necesito hacer esto solo cuando todas las tareas anteriores hayan dejado de hacer su trabajo.ejecutores de Java: espere la finalización de la tarea.

Quiero usar Java Executor, en particular intenté usar Executors.newFixedThreadPool(100) para usar un número fijo de hilo (tengo un número variable de tareas que puede ser 10 o 500) pero soy nuevo con los ejecutores y yo no sé cómo esperar la terminación de la tarea. Esto es algo así como un pseudocódigo de lo que mi programa tiene que hacer:

ExecutorService e = Executors.newFixedThreadPool(100); 
while(true){ 

/*do something*/ 

for(...){ 
<start task> 
} 

<wait for all task termination> 

for each String in result{ 
<start task> 
} 

<wait for all task termination> 
} 

no puedo hacer un e.shutdown porque estoy en un while (true) y necesito volver a utilizar el executorService ..

¿Me puede ayudar? ¿Me puede sugerir una guía/libro sobre ejecutores de Java?

+0

Eche un vistazo a http://stackoverflow.com/questions/1228433/java-parallel-work-iterator/1228445 – Tim

Respuesta

21

El ExecutorService le ofrece un mecanismo para ejecutar múltiples tareas simultáneamente y obtener una colección de objetos Future de vuelta (que representa el cálculo asincrónico de la tarea).

Collection<Callable<?>> tasks = new LinkedList<Callable<?>>(); 
//populate tasks 
for (Future<?> f : executorService.invokeAll(tasks)) { //invokeAll() blocks until ALL tasks submitted to executor complete 
    f.get(); 
} 

Si tiene Runnable s en lugar de Callable s, se puede convertir fácilmente una Runnable en un Callable<Object> usando el método:

Callable<?> c = Executors.callable(runnable); 
+4

Lo siento por desenterrar una publicación anterior, pero no veo el sentido de llamar a f.get(); invokeAll() es en sí mismo el bloqueo, por lo que no hay necesidad de llamar a get() a menos que esté interesado en el resultado (que su código no es). – hooch

+0

@hooch: IMO siempre debe llamar a 'get()' en caso de que su 'Callable' lanzara una excepción. De lo contrario, simplemente se pierde. –

2

Cuando se envía a un servicio de ejecutor, obtendrá un objeto Future.

Almacene esos objetos en una colección, y luego llame al get() en cada uno sucesivamente. get() bloques hasta que se complete el trabajo subyacente, por lo que el resultado es que la llamada get() en cada uno se completará una vez que hayan finalizado todos los trabajos subyacentes.

p. Ej.

Collection<Future> futures = ... 
for (Future f : futures) { 
    Object result = f.get(); 
    // maybe do something with the result. This could be a 
    // genericised Future<T> 
} 
System.out.println("Tasks completed"); 

Una vez que todos estos se hayan completado, comience su segunda presentación. Tenga en cuenta que este podría no ser un uso óptimo de su grupo de subprocesos, ya que se volverá inactivo y, a continuación, volverá a poblarlo. Si es posible intente mantenerlo ocupado haciendo cosas.

+0

¿Cuál es la diferencia entre e.submit (creo que necesito usar esto siguiendo su ejemplo) y e .¿¿ejecutar?? – Raffo

+0

La diferencia es que con submit obtienes un Future y con execute no. Si usa su propia 'ThreadFactory' con un' UncaughtExceptionHandler', entonces 'execute' hará que el controlador reciba cualquier excepción no detectada, mientras que' submit' no lo hará - usted solo obtendría las excepciones a través del 'Future''s' get 'Método –

14

¿Me puede sugerir una guía/libro sobre java ejecutores ??

puedo responder a esta parte:

Java Concurrency in Practice por Brian Goetz (con Tim Peierls, Joshua Bloch, Joseph Bowbeer, David Holmes y Doug Lea) es más probable que su mejor opción.

No es solamente sobre ejecutores sin embargo, sino que abarca java.util.concurrent paquete en general, así como los conceptos y técnicas básicas de concurrencia, y algunos temas avanzados como el modelo de memoria de Java.

+0

Este es un excelente libro, aunque en realidad no es para principiantes. –

14

En lugar de someterse Runnable s o s Callable a un Executor directamente y almacenar los valores de retorno correspondientes Future me gustaría recomendar el uso de una aplicación CompletionService para recuperar cada Futurecuando finaliza. Este enfoque desacopla la producción de tareas del consumo de tareas completadas, lo que permite, por ejemplo, que nuevas tareas se originen en un hilo productor durante un período de tiempo.

Collection<Callable<Result>> workItems = ... 
ExecutorService executor = Executors.newSingleThreadExecutor(); 
CompletionService<Result> compService = new ExecutorCompletionService<Result>(executor); 

// Add work items to Executor. 
for (Callable<Result> workItem : workItems) { 
    compService.submit(workItem); 
} 

// Consume results as they complete (this would typically occur on a different thread). 
for (int i=0; i<workItems.size(); ++i) { 
    Future<Result> fut = compService.take(); // Will block until a result is available. 
    Result result = fut.get(); // Extract result; this will not block. 
} 
+0

En la práctica esto implica más LOC que mi ejemplo anterior. Un CompletionService es generalmente útil si envía tareas desde muchas ubicaciones pero desea manejar las finalizaciones de tareas de manera consistente (por ejemplo, para realizar otro cálculo), que solo quiere definir una vez –

+1

@oxbow: O si desea comenzar a procesar los resultados tan pronto como se complete la primera tarea! De lo contrario, podría estar esperando su tarea más lenta mientras los demás ya están listos ... (Adamski +1) – Tim

+1

@Tim - El OP dijo claramente que quería esperar hasta que todas las tareas estuvieran terminadas, por lo que no importa (aparte de unos pocos nanosegundos) cuya tarea finaliza primero. –

1
ExecutorService executor = ... 
//submit tasks 
executor.shutdown(); // previously submitted tasks are executed, 
        // but no new tasks will be accepted 
while(!executor.awaitTermination(1, TimeUnit.SECONDS)) 
    ; 

No hay manera fácil de hacer lo que quiera sin crear ExecutorService personalizado.

Cuestiones relacionadas