13

Tengo el siguiente fragmento de código que, básicamente, explora la lista de tareas que se deben ejecutar y cada tarea se entrega al ejecutor para su ejecución.Manejo de excepciones para ThreadPoolExecutor

El JobExecutor a su vez crea otro ejecutor (para hacer cosas db ... leer y escribir datos en la cola) y completa la tarea.

JobExecutor devuelve un Future<Boolean> para las tareas enviadas. Cuando una de las tareas falla, quiero interrumpir con gracia todos los hilos y apagar el ejecutor al capturar todas las excepciones. ¿Qué cambios necesito hacer?

public class DataMovingClass { 
    private static final AtomicInteger uniqueId = new AtomicInteger(0); 

    private static final ThreadLocal<Integer> uniqueNumber = new IDGenerator(); 

    ThreadPoolExecutor threadPoolExecutor = null ; 

    private List<Source> sources = new ArrayList<Source>(); 

    private static class IDGenerator extends ThreadLocal<Integer> { 
     @Override 
     public Integer get() { 
      return uniqueId.incrementAndGet(); 
     } 
    } 

    public void init(){ 

    // load sources list 

    } 

    public boolean execute() { 

    boolean succcess = true ; 
    threadPoolExecutor = new ThreadPoolExecutor(10,10, 
       10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
       new ThreadFactory() { 
        public Thread newThread(Runnable r) { 
         Thread t = new Thread(r); 
         t.setName("DataMigration-" + uniqueNumber.get()); 
         return t; 
        }// End method 
       }, new ThreadPoolExecutor.CallerRunsPolicy()); 

    List<Future<Boolean>> result = new ArrayList<Future<Boolean>>(); 

    for (Source source : sources) { 
        result.add(threadPoolExecutor.submit(new JobExecutor(source))); 
    } 

    for (Future<Boolean> jobDone : result) { 
       try { 
        if (!jobDone.get(100000, TimeUnit.SECONDS) && success) { 
         // in case of successful DbWriterClass, we don't need to change 
         // it. 
         success = false; 
        } 
       } catch (Exception ex) { 
        // handle exceptions 
       } 
      } 

    } 

    public class JobExecutor implements Callable<Boolean> { 

     private ThreadPoolExecutor threadPoolExecutor ; 
     Source jobSource ; 
     public SourceJobExecutor(Source source) { 
      this.jobSource = source; 
      threadPoolExecutor = new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1024), 
        new ThreadFactory() { 
         public Thread newThread(Runnable r) { 
          Thread t = new Thread(r); 
          t.setName("Job Executor-" + uniqueNumber.get()); 
          return t; 
         }// End method 
        }, new ThreadPoolExecutor.CallerRunsPolicy()); 
     } 

     public Boolean call() throws Exception { 
      boolean status = true ; 
      System.out.println("Starting Job = " + jobSource.getName()); 
      try { 

         // do the specified task ; 


      }catch (InterruptedException intrEx) { 
       logger.warn("InterruptedException", intrEx); 
       status = false ; 
      } catch(Exception e) { 
       logger.fatal("Exception occurred while executing task "+jobSource.getName(),e); 
       status = false ; 
      } 
      System.out.println("Ending Job = " + jobSource.getName()); 
      return status ; 
     } 
    } 
} 

Respuesta

14

Cuando se envía una tarea al ejecutor, le devuelve una instancia de FutureTask.

FutureTask.get() volverá a lanzar cualquier excepción lanzada por la tarea como ExecutorException.

Por lo tanto, cuando itere a través del List<Future> y llame a cada uno, tome ExecutorException e invoque un cierre ordenado.

+0

bien ... ¿ves algún otro defecto o lugar en el que deba manejar las excepciones? – jagamot

1

Subclase ThreadPoolExecutor y anule su método protected afterExecute (Runnable r, Throwable t).

Si está creando un grupo de subprocesos a través de la clase de conveniencia java.util.concurrent.Executors (que no es), consulte su fuente para ver cómo se invoca ThreadPoolExecutor.

0

Dado que está enviando tareas al ThreadPoolExecutor, las excepciones son tragadas por FutureTask.

Tener un vistazo a este code

**Inside FutureTask$Sync** 

void innerRun() { 
    if (!compareAndSetState(READY, RUNNING)) 
     return; 

    runner = Thread.currentThread(); 
    if (getState() == RUNNING) { // recheck after setting thread 
     V result; 
     try { 
      result = callable.call(); 
     } catch (Throwable ex) { 
      setException(ex); 
      return; 
     } 
     set(result); 
    } else { 
     releaseShared(0); // cancel 
    } 

}

protected void setException(Throwable t) { 
    sync.innerSetException(t); 
} 

Desde arriba de código, es evidente que setException método de captura Throwable.Debido a esta razón, FutureTask está tragando todas las excepciones si se utiliza el método "submit()" en ThreadPoolExecutor

Según java documentation, puede extender afterExecute() método en el ThreadPoolExecutor

protected void afterExecute(Runnable r, 
          Throwable t) 

Código de la muestra de acuerdo con la documentación:

class ExtendedExecutor extends ThreadPoolExecutor { 
    // ... 
    protected void afterExecute(Runnable r, Throwable t) { 
    super.afterExecute(r, t); 
    if (t == null && r instanceof Future<?>) { 
     try { 
     Object result = ((Future<?>) r).get(); 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
    } 
    if (t != null) 
     System.out.println(t); 
    } 
} 

Usted puede coger Exceptions de tres maneras

  1. Future.get() como se sugiere en respuesta aceptada
  2. envoltura toda run() o call() método en try{}catch{}Exceptoion{} bloques
  3. de anulación afterExecute de ThreadPoolExecutor método como se muestra arriba

Para graciosamente interrumpir otros hilos, echar un vistazo a continuación SE pregunta:

How to stop next thread from running in a ScheduledThreadPoolExecutor

How to forcefully shutdown java ExecutorService