167

Estoy tratando de usar la clase ThreadPoolExecutor de Java para ejecutar una gran cantidad de tareas pesadas con un número fijo de hilos. Cada una de las tareas tiene muchos lugares durante los cuales puede fallar debido a excepciones.Manejando excepciones de tareas de Java ExecutorService

He subclasificado ThreadPoolExecutor y he reemplazado el método afterExecute que se supone que proporciona las excepciones no detectadas encontradas al ejecutar una tarea. Sin embargo, parece que no puedo hacer que funcione.

Por ejemplo:

public class ThreadPoolErrors extends ThreadPoolExecutor { 
    public ThreadPoolErrors() { 
     super( 1, // core threads 
       1, // max threads 
       1, // timeout 
       TimeUnit.MINUTES, // timeout units 
       new LinkedBlockingQueue<Runnable>() // work queue 
     ); 
    } 

    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     if(t != null) { 
      System.out.println("Got an error: " + t); 
     } else { 
      System.out.println("Everything's fine--situation normal!"); 
     } 
    } 

    public static void main(String [] args) { 
     ThreadPoolErrors threadPool = new ThreadPoolErrors(); 
     threadPool.submit( 
       new Runnable() { 
        public void run() { 
         throw new RuntimeException("Ouch! Got an error."); 
        } 
       } 
     ); 
     threadPool.shutdown(); 
    } 
} 

La salida de este programa es "Todo está bien - situación normal!" aunque el único Runnable enviado al grupo de subprocesos arroja una excepción. ¿Alguna pista de lo que está pasando aquí?

Gracias!

Respuesta

121

Desde el docs:

Nota: Cuando las acciones están encerrados en tareas (como FutureTask) explícitamente o a través de métodos como enviar, estos objetos de tarea atrapar y mantener excepciones de cálculo, y por lo que n ot causa abrupta terminación , y las excepciones internas no se pasan a este método .

Cuando envía un Runnable, queda envuelto en un futuro.

Su afterExecute debe ser algo como esto:

protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     if (t == null && r instanceof Future<?>) { 
     try { 
      Future<?> future = (Future<?>) r; 
      if (future.isDone()) { 
      future.get(); 
      } 
     } catch (CancellationException ce) { 
      t = ce; 
     } catch (ExecutionException ee) { 
      t = ee.getCause(); 
     } catch (InterruptedException ie) { 
      Thread.currentThread().interrupt(); // ignore/reset 
     } 
     } 
     if (t != null) { 
      System.out.println(t); 
     } 
} 
+4

Gracias, terminé usando esta solución. Además, en caso de que alguien esté interesado: otros han sugerido no subclasificar el ExecutorService, pero lo hice de todos modos porque quería supervisar las tareas mientras finalizaban en lugar de esperar que todas terminaran y luego llamar a get() en todas las Futuros devueltos. – Tom

+1

Otro enfoque para crear subclases del ejecutor es subclase FutureTask y anular su método 'hecho' – nos

+1

Tom >> ¿Puede publicar el código de fragmento de muestra donde ha subclase ExecutorService para supervisar las tareas mientras completan ... – jagamot

194

ADVERTENCIA: Cabe señalar que esta solución bloqueará el hilo de llamada.


Si desea procesar las excepciones producidas por la tarea, entonces es generalmente mejor utilizar Callable en lugar de Runnable.

Callable.call() se le permite lanzar excepciones controladas, y estos se propaga de nuevo al subproceso de llamada:

Callable task = ... 
Future future = executor.submit(task); 
try { 
    future.get(); 
} catch (ExecutionException ex) { 
    ex.getCause().printStackTrace(); 
} 

Si Callable.call() se produce una excepción, esto será envuelto en una ExecutionException y arrojado por Future.get().

Esto es probable que sea mucho mejor que la subclasificación ThreadPoolExecutor. También le da la oportunidad de volver a enviar la tarea si la excepción es recuperable.

+19

Esta debe ser la respuesta aceptada en mi humilde opinión. – Timo

+2

De acuerdo. Esta debería ser la respuesta aceptada. – Joeblackdev

+3

_> Callable.call() permite arrojar excepciones comprobadas, y éstas se propagan de nuevo a la cadena de llamada: _ Tenga en cuenta que la excepción lanzada se propagará a la cadena de llamada solamente si 'future.get()' o su se llama la versión sobrecargada. – nhylated

-5

En lugar de ThreadPoolExecutor subclases, me dotarla de una instancia ThreadFactory que crea nuevos temas y les proporciona una UncaughtExceptionHandler

+3

Intenté esto también, pero el método uncaughtException nunca parece ser llamado. Creo que esto se debe a que un hilo de trabajo en la clase ThreadPoolExecutor está captando las excepciones. – Tom

+5

El método uncaughtException no se llama porque el método de envío ExecutorService está envolviendo el Callable/Runnable en un futuro; la excepción está siendo capturada allí. –

+0

por favor proporcione un código y no solo enlaces! –

13

La explicación de este comportamiento es justo en el javadoc for afterExecute:

Nota: Cuando las acciones están encerrados en tareas (como FutureTask) ya sea explícitamente o a través de métodos como enviar, estos objetos de tareas atrapan y mantienen excepciones de cálculo, y para que lo hagan n ot causa abrupta terminación , y las excepciones internas no se pasan a este método .

6

estoy usando VerboseRunnable clase de jcabi-log, que se traga todas las excepciones y los registra. Muy conveniente, por ejemplo:

import com.jcabi.log.VerboseRunnable; 
scheduler.scheduleWithFixedDelay(
    new VerboseRunnable(
    Runnable() { 
     public void run() { 
     // the code, which may throw 
     } 
    }, 
    true // it means that all exceptions will be swallowed and logged 
), 
    1, 1, TimeUnit.MILLISECONDS 
); 
7

Lo envolví envolviendo el ejecutable suministrado enviado al ejecutor.

CompletableFuture.runAsync(

     () -> { 
       try { 
         runnable.run(); 
       } catch (Throwable e) { 
         Log.info(Concurrency.class, "runAsync", e); 
       } 
     }, 

     executorService 
); 
1

Si su ExecutorService proviene de una fuente externa (es decir, no es posible subclase ThreadPoolExecutor y anular afterExecute()), se puede utilizar un proxy dinámico para lograr el comportamiento deseado:

public static ExecutorService errorAware(final ExecutorService executor) { 
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), 
      new Class[] {ExecutorService.class}, 
      (proxy, method, args) -> { 
       if (method.getName().equals("submit")) { 
        final Object arg0 = args[0]; 
        if (arg0 instanceof Runnable) { 
         args[0] = new Runnable() { 
          @Override 
          public void run() { 
           final Runnable task = (Runnable) arg0; 
           try { 
            task.run(); 
            if (task instanceof Future<?>) { 
             final Future<?> future = (Future<?>) task; 

             if (future.isDone()) { 
              try { 
               future.get(); 
              } catch (final CancellationException ce) { 
               // Your error-handling code here 
               ce.printStackTrace(); 
              } catch (final ExecutionException ee) { 
               // Your error-handling code here 
               ee.getCause().printStackTrace(); 
              } catch (final InterruptedException ie) { 
               Thread.currentThread().interrupt(); 
              } 
             } 
            } 
           } catch (final RuntimeException re) { 
            // Your error-handling code here 
            re.printStackTrace(); 
            throw re; 
           } catch (final Error e) { 
            // Your error-handling code here 
            e.printStackTrace(); 
            throw e; 
           } 
          } 
         }; 
        } else if (arg0 instanceof Callable<?>) { 
         args[0] = new Callable<Object>() { 
          @Override 
          public Object call() throws Exception { 
           final Callable<?> task = (Callable<?>) arg0; 
           try { 
            return task.call(); 
           } catch (final Exception e) { 
            // Your error-handling code here 
            e.printStackTrace(); 
            throw e; 
           } catch (final Error e) { 
            // Your error-handling code here 
            e.printStackTrace(); 
            throw e; 
           } 
          } 
         }; 
        } 
       } 
       return method.invoke(executor, args); 
      }); 
} 
3

Otra solución sería utilice ManagedTask y ManagedTaskListener.

Se necesita un rescatable o Ejecutable que implementa la interfaz ManagedTask.

El método getManagedTaskListener devuelve la instancia que desee.

public ManagedTaskListener getManagedTaskListener() { 

Y se implementa en ManagedTaskListener la taskDone método:

@Override 
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) { 
    if (exception != null) { 
     LOGGER.log(Level.SEVERE, exception.getMessage()); 
    } 
} 

Más detalles sobre managed task lifecycle and listener.

0

Esto es debido a AbstractExecutorService :: submit está terminando su runnable en RunnableFuture (nada más que FutureTask), como a continuación

AbstractExecutorService.java 

public Future<?> submit(Runnable task) { 
    if (task == null) throw new NullPointerException(); 
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE//////// 
    execute(ftask); 
    return ftask; 
} 

Entonces execute lo pasará a Worker y Worker.run() llamará a los de abajo.

ThreadPoolExecutor.java 

final void runWorker(Worker w) { 
    Thread wt = Thread.currentThread(); 
    Runnable task = w.firstTask; 
    w.firstTask = null; 
    w.unlock(); // allow interrupts 
    boolean completedAbruptly = true; 
    try { 
     while (task != null || (task = getTask()) != null) { 
      w.lock(); 
      // If pool is stopping, ensure thread is interrupted; 
      // if not, ensure thread is not interrupted. This 
      // requires a recheck in second case to deal with 
      // shutdownNow race while clearing interrupt 
      if ((runStateAtLeast(ctl.get(), STOP) || 
       (Thread.interrupted() && 
        runStateAtLeast(ctl.get(), STOP))) && 
       !wt.isInterrupted()) 
       wt.interrupt(); 
      try { 
       beforeExecute(wt, task); 
       Throwable thrown = null; 
       try { 
        task.run();   /////////HERE//////// 
       } catch (RuntimeException x) { 
        thrown = x; throw x; 
       } catch (Error x) { 
        thrown = x; throw x; 
       } catch (Throwable x) { 
        thrown = x; throw new Error(x); 
       } finally { 
        afterExecute(task, thrown); 
       } 
      } finally { 
       task = null; 
       w.completedTasks++; 
       w.unlock(); 
      } 
     } 
     completedAbruptly = false; 
    } finally { 
     processWorkerExit(w, completedAbruptly); 
    } 
} 

Finalmente task.run(); en la anterior llamada codificada llamará FutureTask.run().Aquí está el código del controlador de excepción, debido a esto NO está obteniendo la excepción esperada.

class FutureTask<V> implements RunnableFuture<V> 

public void run() { 
    if (state != NEW || 
     !UNSAFE.compareAndSwapObject(this, runnerOffset, 
            null, Thread.currentThread())) 
     return; 
    try { 
     Callable<V> c = callable; 
     if (c != null && state == NEW) { 
      V result; 
      boolean ran; 
      try { 
       result = c.call(); 
       ran = true; 
      } catch (Throwable ex) { /////////HERE//////// 
       result = null; 
       ran = false; 
       setException(ex); 
      } 
      if (ran) 
       set(result); 
     } 
    } finally { 
     // runner must be non-null until state is settled to 
     // prevent concurrent calls to run() 
     runner = null; 
     // state must be re-read after nulling runner to prevent 
     // leaked interrupts 
     int s = state; 
     if (s >= INTERRUPTING) 
      handlePossibleCancellationInterrupt(s); 
    } 
} 
0

Si desea supervisar la ejecución de la tarea, se puede girar 1 o 2 hilos (tal vez más dependiendo de la carga) y utilizarlos para tomar las tareas de un envoltorio ExecutionCompletionService.

0

Esto funciona

  • Se deriva de SingleThreadExecutor, pero se puede adaptar fácilmente
  • Java 8 lamdas código, pero fácil de solucionar

Se va a crear un ejecutor con una sola hilo, que puede obtener muchas tareas; y esperará a que el actual, hasta la ejecución final para comenzar con la siguiente

En caso de error o excepción uncaugth la uncaughtExceptionHandler lo cogerá

 
public final class SingleThreadExecutorWithExceptions { 

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { 

     ThreadFactory factory = (Runnable runnable) -> { 
      final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); 
      newThread.setUncaughtExceptionHandler((final Thread caugthThread,final Throwable throwable) -> { 
       uncaughtExceptionHandler.uncaughtException(caugthThread, throwable); 
      }); 
      return newThread; 
     }; 
     return new FinalizableDelegatedExecutorService 
       (new ThreadPoolExecutor(1, 1, 
         0L, TimeUnit.MILLISECONDS, 
         new LinkedBlockingQueue(), 
         factory){ 


        protected void afterExecute(Runnable runnable, Throwable throwable) { 
         super.afterExecute(runnable, throwable); 
         if (throwable == null && runnable instanceof Future) { 
          try { 
           Future future = (Future) runnable; 
           if (future.isDone()) { 
            future.get(); 
           } 
          } catch (CancellationException ce) { 
           throwable = ce; 
          } catch (ExecutionException ee) { 
           throwable = ee.getCause(); 
          } catch (InterruptedException ie) { 
           Thread.currentThread().interrupt(); // ignore/reset 
          } 
         } 
         if (throwable != null) { 
          uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); 
         } 
        } 
       }); 
    } 



    private static class FinalizableDelegatedExecutorService 
      extends DelegatedExecutorService { 
     FinalizableDelegatedExecutorService(ExecutorService executor) { 
      super(executor); 
     } 
     protected void finalize() { 
      super.shutdown(); 
     } 
    } 

    /** 
    * A wrapper class that exposes only the ExecutorService methods 
    * of an ExecutorService implementation. 
    */ 
    private static class DelegatedExecutorService extends AbstractExecutorService { 
     private final ExecutorService e; 
     DelegatedExecutorService(ExecutorService executor) { e = executor; } 
     public void execute(Runnable command) { e.execute(command); } 
     public void shutdown() { e.shutdown(); } 
     public List shutdownNow() { return e.shutdownNow(); } 
     public boolean isShutdown() { return e.isShutdown(); } 
     public boolean isTerminated() { return e.isTerminated(); } 
     public boolean awaitTermination(long timeout, TimeUnit unit) 
       throws InterruptedException { 
      return e.awaitTermination(timeout, unit); 
     } 
     public Future submit(Runnable task) { 
      return e.submit(task); 
     } 
     public Future submit(Callable task) { 
      return e.submit(task); 
     } 
     public Future submit(Runnable task, T result) { 
      return e.submit(task, result); 
     } 
     public List> invokeAll(Collection> tasks) 
       throws InterruptedException { 
      return e.invokeAll(tasks); 
     } 
     public List> invokeAll(Collection> tasks, 
              long timeout, TimeUnit unit) 
       throws InterruptedException { 
      return e.invokeAll(tasks, timeout, unit); 
     } 
     public T invokeAny(Collection> tasks) 
       throws InterruptedException, ExecutionException { 
      return e.invokeAny(tasks); 
     } 
     public T invokeAny(Collection> tasks, 
           long timeout, TimeUnit unit) 
       throws InterruptedException, ExecutionException, TimeoutException { 
      return e.invokeAny(tasks, timeout, unit); 
     } 
    } 



    private SingleThreadExecutorWithExceptions() {} 
} 
Cuestiones relacionadas