2012-06-07 40 views
25

Mi pregunta: ¿Cómo ejecutar un grupo de objetos con hebras en un ThreadPoolExecutor y esperar a que todos terminen antes de continuar?Cómo esperar a que un ThreadPoolExecutor termine

Soy nuevo en ThreadPoolExecutor. Este código es una prueba para saber cómo funciona. En este momento ni siquiera llene el BlockingQueue con los objetos porque no entiendo cómo iniciar la cola sin llamar a execute() con otro RunnableObject. De todos modos, ahora solo llamo al awaitTermination() pero creo que aún me falta algo. ¡Cualquier tip estaría genial! Gracias.

public void testThreadPoolExecutor() throws InterruptedException { 
    int limit = 20; 
    BlockingQueue q = new ArrayBlockingQueue(limit); 
    ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
    for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
    } 
    ex.awaitTermination(2, TimeUnit.SECONDS); 
    System.out.println("finished"); 
} 

La clase RunnableObject:

package playground; 

public class RunnableObject implements Runnable { 

    private final int id; 

    public RunnableObject(int id) { 
    this.id = id; 
    } 

    @Override 
    public void run() { 
    System.out.println("ID: " + id + " started"); 
    try { 
     Thread.sleep(2354); 
    } catch (InterruptedException ignore) { 
    } 
    System.out.println("ID: " + id + " ended"); 
    } 
} 
+1

no creo que es una buena idea para responder a su pregunta inmediatamente después de que lo puesto y sugieren que su propia respuesta no es del todo adecuada. En el mejor de los casos, eso garantiza y actualiza su pregunta original explicando por qué esa respuesta no es suficiente. – Kiril

+1

@Link, tienes razón. Lo arreglaré. – kentcdodds

+0

Acabo de editar mi respuesta para mostrar una forma de esperar a que terminen todos los trabajos antes de cerrar el ejecutor. – Gray

Respuesta

43

Usted debe reproducirse en awaitTermination

ExecutorService threads; 
// ... 
// Tell threads to finish off. 
threads.shutdown(); 
// Wait for everything to finish. 
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) { 
    log.info("Awaiting completion of threads."); 
} 
+0

Eso lo hizo totalmente. No noté que 'awaitTermination' devuelve nada. Nadie más mencionó que necesitaría recorrerlo para bloquear. ¡Gracias! – kentcdodds

+9

¿Por qué hacerlo en un bucle? ¿Por qué no aumentar el número de segundos para esperar? Por lo general, hago 'awaitTermination (Long.MAX_VALUE, TimeUnit.SECONDS)' o algo así. – Gray

+1

@Gray - Inicialmente lo llamé una vez en un código mío y o cosas realmente extrañas comenzaron a suceder en la salida a veces o encerrado todo. Finalmente lo rastreé hasta esto, así que ahora hago un bucle y registro un mensaje de "espera", así sé que algo extraño está sucediendo. – OldCurmudgeon

4

Su problema parece ser que no está llamando shutdown después de haber presentado todos los puestos de trabajo a su piscina. Sin shutdown() su awaitTermination siempre devolverá falso.

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 
// you are missing this line!! 
ex.shutdown(); 
ex.awaitTermination(2, TimeUnit.SECONDS); 

También puede hacer algo como lo siguiente que esperar a que todos sus trabajos para terminar:

List<Future<Object>> futures = new ArrayList<Future<Object>>(); 
for (int i = 0; i < limit; i++) { 
    futures.add(ex.submit(new RunnableObject(i + 1), (Object)null)); 
} 
for (Future<Object> future : futures) { 
    // this joins with the submitted job 
    future.get(); 
} 
... 
// still need to shutdown at the end 
ex.shutdown(); 

Además, debido a que usted está durmiendo para 2354 milisegundos, pero sólo a la espera de la terminación de la totalidad de la trabajos para 2SECONDS, awaitTermination siempre devolverá false.

Por último, parece que está preocupado por crear un nuevo ThreadPoolExecutor y en su lugar desea reutilizar el primero. No seas. La sobrecarga del GC será extremadamente mínima en comparación con cualquier código que escriba para detectar si los trabajos están terminados.


citar el javadocs, ThreadPoolExecutor.shutdown():

inicia un cierre ordenado en el que se ejecutan las tareas presentadas anteriormente, pero no se aceptarán nuevas tareas. La invocación no tiene ningún efecto adicional si ya se ha cerrado.

En el método ThreadPoolExecutor.awaitTermination(...), se está a la espera para el estado del ejecutor para ir a TERMINATED. Pero primero el estado debe ir al SHUTDOWN si se llama shutdown() o STOP si se llama al shutdownNow().

3

No tiene nada que ver con el ejecutor. Simplemente use la interfaz java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>). Bloqueará hasta que finalicen todos los Callable s.

Los ejecutores deben ser duraderos; más allá de la vida de un grupo de tareas. shutdown es para cuando la aplicación está terminada y limpiando.

+0

¿Podría explicar esta respuesta? Me está gustando la respuesta, pero estoy teniendo dificultades para implementarla. Gracias – kentcdodds

-1

Prueba de esto,

ThreadPoolExecutor ex = 
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q); 
for (int i = 0; i < limit; i++) { 
    ex.execute(new RunnableObject(i + 1)); 
} 

líneas que se añade

ex.shutdown(); 
ex.awaitTermination(timeout, unit) 
+0

debe verificar el booleano devuelto de esperar la terminación y seguir intentándolo hasta que se cumpla. –

1

Otro enfoque es utilizar CompletionService, muy útil si tiene que Intente cualquier resultado de la tarea:

//run 3 task at time 
final int numParallelThreads = 3; 

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor 
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads); 
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 

int numTaskToStart = 15; 

for(int i=0; i<numTaskToStart ; i++){ 
    //task class that implements Callable<String> (or something you need) 
    MyTask mt = new MyTask(); 

    completionService.submit(mt); 
} 

executor.shutdown(); //it cannot be queued more task 

try { 
    for (int t = 0; t < numTaskToStart ; t++) { 
     Future<String> f = completionService.take(); 
     String result = f.get(); 
     // ... something to do ... 
    } 
} catch (InterruptedException e) { 
    //termination of all started tasks (it returns all not started tasks in queue) 
    executor.shutdownNow(); 
} catch (ExecutionException e) { 
    // ... something to catch ... 
} 
2

Aquí es una variante de la respuesta aceptada que se encarga de reintentos si/cuando InterruptedException se lanza:

executor.shutdown(); 

boolean isWait = true; 

while (isWait) 
{ 
    try 
    {    
     isWait = !executor.awaitTermination(10, TimeUnit.SECONDS); 
     if (isWait) 
     { 
      log.info("Awaiting completion of bulk callback threads."); 
     } 
    } catch (InterruptedException e) { 
     log.debug("Interruped while awaiting completion of callback threads - trying again..."); 
    } 
} 
Cuestiones relacionadas