2011-07-20 9 views
12

Tengo un ScheduledExecutorService con tareas programadas para ejecutarse en una hora. ¿Cómo obtengo la lista de tareas pendientes para poder forzarlas a ejecutar de inmediato?Cómo ejecutar tareas pendientes inmediatamente después de ExecutorService.shutdown()?

Creo shutdown() esperará una hora y se ve como si shutdownNow() devuelve una lista de Runnables que no puede ser run() debido a que la aplicación Ejecutable comprueba el estado Ejecutor y cuando se da cuenta de que se ha cerrado el Ejecutable se niega a ejecutar . Ver ScheduledThreadPoolExecutor.ScheduledFutureTask.run() para la implementación real.

¿Alguna idea?

Respuesta

3

que he tomado respuesta Mark Peters', la aplicación de todos los métodos abstractos, añadió hilo de seguridad y trató respetando la configuración ScheduledThreadPoolExecutor subyacente siempre que sea posible.

/** 
* Overrides shutdown() to run outstanding tasks immediately. 
* 
* @author Gili Tzabari 
*/ 
public class RunOnShutdownScheduledExecutorService extends AbstractExecutorService 
    implements ScheduledExecutorService 
{ 
    private final ScheduledExecutorService delegate; 
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; 
    private final ExecutorService immediateService; 
    private final ConcurrentMap<Future<?>, Callable<?>> tasks = Maps.newConcurrentMap(); 

    /** 
    * Creates a new RunOnShutdownScheduledExecutorService. 
    * 
    * @param delegate the executor to delegate to 
    */ 
    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegate) 
    { 
     Preconditions.checkNotNull(delegate, "delegate may not be null"); 

     this.delegate = delegate; 
     if (delegate instanceof ScheduledThreadPoolExecutor) 
     { 
      this.scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) delegate; 
      this.immediateService = Executors.newFixedThreadPool(scheduledThreadPoolExecutor. 
       getCorePoolSize(), scheduledThreadPoolExecutor.getThreadFactory()); 
     } 
     else 
     { 
      scheduledThreadPoolExecutor = null; 
      this.immediateService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). 
       setNameFormat(RunOnShutdownScheduledExecutorService.class.getName() + "-%d").build()); 
     } 
    } 

    @Override 
    public boolean isShutdown() 
    { 
     return delegate.isShutdown(); 
    } 

    @Override 
    public boolean isTerminated() 
    { 
     return delegate.isTerminated(); 
    } 

    @Override 
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException 
    { 
     long before = System.nanoTime(); 
     if (!delegate.awaitTermination(timeout, unit)) 
      return false; 
     long after = System.nanoTime(); 
     long timeLeft = timeout - unit.convert(after - before, TimeUnit.NANOSECONDS); 
     return immediateService.awaitTermination(timeLeft, unit); 
    } 

    @Override 
    public void execute(Runnable command) 
    { 
     delegate.execute(command); 
    } 

    @Override 
    public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    { 
     CallableWithFuture<V> decorated = new CallableWithFuture<>(callable); 
     ScheduledFuture<V> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, callable); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.scheduleAtFixedRate(decorated, initialDelay, period, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = 
      delegate.scheduleWithFixedDelay(decorated, initialDelay, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public synchronized void shutdown() 
    { 
     if (delegate.isShutdown()) 
      return; 
     if (scheduledThreadPoolExecutor != null) 
     { 
      // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 
      // 
      // Cancel waiting scheduled tasks, otherwise executor won't shut down 
      scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 
     } 
     delegate.shutdown(); 
     // Users will not be able to cancel() Futures past this point so we're guaranteed that 
     // "tasks" will not be modified. 

     final List<Callable<?>> outstandingTasks = Lists.newArrayList(); 
     for (Map.Entry<Future<?>, Callable<?>> entry: tasks.entrySet()) 
     { 
      Future<?> future = entry.getKey(); 
      Callable<?> task = entry.getValue(); 

      if (future.isDone() && future.isCancelled()) 
      { 
       // Task called by the underlying executor, not the user. See CleaningScheduledFuture. 
       outstandingTasks.add(task); 
      } 
     } 
     tasks.clear(); 
     if (outstandingTasks.isEmpty()) 
     { 
      immediateService.shutdown(); 
      return; 
     } 

     immediateService.submit(new Callable<Void>() 
     { 
      @Override 
      public Void call() throws Exception 
      { 
       delegate.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 

       // Execute outstanding tasks only after the delegate executor finishes shutting down 
       for (Callable<?> task: outstandingTasks) 
        immediateService.submit(task); 
       immediateService.shutdown(); 
       return null; 
      } 
     }); 
    } 

    @Override 
    public List<Runnable> shutdownNow() 
    { 
     return delegate.shutdownNow(); 
    } 

    /** 
    * A Runnable that removes its future when running. 
    */ 
    private class CleaningRunnable implements Runnable 
    { 
     private final Runnable delegate; 
     private Future<?> future; 

     /** 
     * Creates a new RunnableWithFuture. 
     * 
     * @param delegate the Runnable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningRunnable(Runnable delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<?> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public void run() 
     { 
      tasks.remove(future); 
      delegate.run(); 
     } 
    } 

    /** 
    * A Callable that removes its future when running. 
    */ 
    private class CallableWithFuture<V> implements Callable<V> 
    { 
     private final Callable<V> delegate; 
     private Future<V> future; 

     /** 
     * Creates a new CallableWithFuture. 
     * 
     * @param delegate the Callable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CallableWithFuture(Callable<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<V> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public V call() throws Exception 
     { 
      tasks.remove(future); 
      return delegate.call(); 
     } 
    } 

    /** 
    * A ScheduledFuture that removes its future when canceling. 
    * 
    * This allows us to differentiate between tasks canceled by the user and the underlying 
    * executor. Tasks canceled by the user are removed from "tasks". 
    * 
    * @param <V> The result type returned by this Future 
    */ 
    private class CleaningScheduledFuture<V> implements ScheduledFuture<V> 
    { 
     private final ScheduledFuture<V> delegate; 

     /** 
     * Creates a new MyScheduledFuture. 
     * 
     * @param delegate the future to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningScheduledFuture(ScheduledFuture<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     @Override 
     public long getDelay(TimeUnit unit) 
     { 
      return delegate.getDelay(unit); 
     } 

     @Override 
     public int compareTo(Delayed o) 
     { 
      return delegate.compareTo(o); 
     } 

     @Override 
     public boolean cancel(boolean mayInterruptIfRunning) 
     { 
      boolean result = delegate.cancel(mayInterruptIfRunning); 

      if (result) 
      { 
       // Tasks canceled by users are removed from "tasks" 
       tasks.remove(delegate); 
      } 
      return result; 
     } 

     @Override 
     public boolean isCancelled() 
     { 
      return delegate.isCancelled(); 
     } 

     @Override 
     public boolean isDone() 
     { 
      return delegate.isDone(); 
     } 

     @Override 
     public V get() throws InterruptedException, ExecutionException 
     { 
      return delegate.get(); 
     } 

     @Override 
     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
      TimeoutException 
     { 
      return delegate.get(timeout, unit); 
     } 
    } 
} 
+1

Acabo de descubrir un error desagradable en ScheduledThreadPoolExecutor. Si un subproceso de trabajador está esperando una tarea que solo se ejecutará en una hora y usted cancela esa tarea, el trabajador seguirá esperando y el ejecutor no se apagará. Archivé un informe de error: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 – Gili

+0

Actualicé mi respuesta con una solución para el error # 7069418 – Gili

0

¡Buena pregunta! Sin embargo, parece que puedes estar solo arreglando una solución.

Una opción podría ser envolver el ScheduledThreadPoolExecutor con su propia implementación de ScheduledExecutorService. Cuando llega el momento de cerrar el servicio, cancele las tareas que se pueden cancelar y en su lugar, envíelas a un servicio que las ejecutará de inmediato. Entonces shutdown() ese servicio.

Aquí hay un código muy difícil que demuestra lo que quiero decir, aunque te advierto que puede haber riesgos aquí, ya que fue creado en pocos minutos. En particular, no me he esforzado demasiado para asegurarme de que esto sea seguro para los hilos.

class RunOnShutdownScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService { 
    private final ScheduledExecutorService delegateService; 

    private Map<Future<?>, Runnable> scheduledFutures = 
      Collections.synchronizedMap(new IdentityHashMap<Future<?>, Runnable>()); 


    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegateService) { 
     this.delegateService = delegateService; 
    } 

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 
     ScheduledFuture<?> future = delegateService.schedule(command, delay, unit); 
     scheduledFutures.put(future, command); 
     return future; 
    } 

    public void shutdown() { 
     delegateService.shutdown(); 
     ExecutorService immediateService = Executors.newFixedThreadPool(5); 
     for (Map.Entry<Future<?>, Runnable> entry : scheduledFutures.entrySet()) { 
      Future<?> future = entry.getKey(); 
      Runnable task = entry.getValue(); 
      if (!future.isDone()) { 
       if (future.cancel(false)) { 
        immediateService.submit(task); 
       } 
      } 
     } 
     immediateService.shutdown(); 
    } 

    //... 
} 
+0

Otro enfoque (discutido por el Javadoc de ScheduledThreadPoolExecutor) parece ser anular decorateTask(). Creo que podría conducir a una implementación más simple. – Gili

+0

@Gili: Parece que eso es una posibilidad. Te sugiero que publiques una respuesta a tu pregunta si parece factible. ¿Sugeriría que siga delegando a un ejecutor inmediato, o estaba pensando que podría decorar la tarea de tal manera que al apagarla cambie su programación? –

+0

No creo que puedas mezclar 'decorateTask()' con la delegación de ejecutores, pero lo pensaré un poco más. – Gili

Cuestiones relacionadas