2010-05-03 6 views
73

Estoy buscando una implementación ExecutorService que se puede proporcionar con un tiempo de espera. Las tareas que se envían al ExecutorService se interrumpen si tardan más tiempo que el tiempo de espera en ejecutarse. Implementar tal bestia no es una tarea tan difícil, pero me pregunto si alguien sabe de una implementación existente.ExecutorService que interrumpe las tareas después de un tiempo de espera

Esto es lo que surgió en función de algunas de las discusiones a continuación. ¿Algún comentario?

import java.util.List; 
import java.util.concurrent.*; 

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor { 
    private final long timeout; 
    private final TimeUnit timeoutUnit; 

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(); 
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>(); 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); 
     this.timeout = timeout; 
     this.timeoutUnit = timeoutUnit; 
    } 

    @Override 
    public void shutdown() { 
     timeoutExecutor.shutdown(); 
     super.shutdown(); 
    } 

    @Override 
    public List<Runnable> shutdownNow() { 
     timeoutExecutor.shutdownNow(); 
     return super.shutdownNow(); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 
     if(timeout > 0) { 
      final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit); 
      runningTasks.put(r, scheduled); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     ScheduledFuture timeoutTask = runningTasks.remove(r); 
     if(timeoutTask != null) { 
      timeoutTask.cancel(false); 
     } 
    } 

    class TimeoutTask implements Runnable { 
     private final Thread thread; 

     public TimeoutTask(Thread thread) { 
      this.thread = thread; 
     } 

     @Override 
     public void run() { 
      thread.interrupt(); 
     } 
    } 
} 
+0

es que la 'hora de inicio' del tiempo de espera de la fecha de presentación? ¿O el momento en que la tarea comienza a ejecutarse? –

+0

Buena pregunta. Cuando comienza a ejecutarse. Presumiblemente utilizando el gancho 'protected void beforeExecute (Thread t, Runnable r)'. –

+0

@ scompt.com sigue usando esta solución o ha sido reemplazada –

Respuesta

69

Se puede utilizar un ScheduledExecutorService para esto. Primero lo enviaría solo una vez para comenzar inmediatamente y retener el futuro que se crea. Después de eso, puede enviar una nueva tarea que cancelaría el futuro retenido después de un período de tiempo.

ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
final Future handler = executor.submit(new Callable(){ ... }); 
executor.schedule(new Runnable(){ 
    public void run(){ 
     handler.cancel(); 
    }  
}, 10000, TimeUnit.MILLISECONDS); 

Esto ejecutará el controlador (principal funcionalidad a las interrupciones) durante 10 segundos, a continuación, se cancelará (es decir, interrupción) esa tarea específica.

+9

Idea interesante, pero ¿qué pasa si la tarea termina antes del tiempo de espera (que normalmente lo hará)? Prefiero no tener toneladas de tareas de limpieza esperando para ejecutar solo para descubrir que su tarea asignada ya se ha completado. Tendría que haber otro hilo supervisando los futuros mientras terminan para eliminar sus tareas de limpieza. –

+2

El ejecutor solo programará esta cancelación una vez. Si la tarea se completa, la cancelación no es una operación y el trabajo continúa sin cambios. Solo debe haber un esquema de subprocesos adicional para cancelar las tareas y un subproceso para ejecutarlos. Puede tener dos ejecutores, uno para enviar sus tareas principales y otro para cancelarlas. –

+2

Eso es cierto, pero ¿qué sucede si el tiempo de espera es de 5 horas y en ese tiempo se ejecutan 10k tareas? Me gustaría evitar tener todos esos no-ops por ahí ocupando memoria y causando interruptores de contexto. –

3

Envuelva la tarea en FutureTask y puede especificar el tiempo de espera para FutureTask. Mira el ejemplo en mi respuesta a esta pregunta,

java native Process timeout

+0

Me doy cuenta de que hay varias maneras de hacerlo utilizando las clases 'java.util.concurrent', pero estoy buscando una implementación' ExecutorService'. –

+0

Si dice que desea que su ExecutorService oculte el hecho de que se están agregando tiempos de espera del código del cliente, podría implementar su propio ExecutorService que envuelva todos los ejecutables entregados con un FutureTask antes de ejecutarlos. – erikprice

5

Lamentablemente, la solución es defectuosa. Hay un tipo de error con ScheduledThreadPoolExecutor, también reportado en this question: cancelar una tarea enviada no libera completamente los recursos de memoria asociados con la tarea; los recursos se lanzan solo cuando la tarea expira.

Si, por lo tanto, crea un TimeoutThreadPoolExecutor con un tiempo de caducidad bastante largo (un uso típico) y envía tareas lo suficientemente rápido, termina llenando la memoria, aunque las tareas realmente se completaron correctamente.

se puede ver el problema con el siguiente programa de prueba (muy cruda):

public static void main(String[] args) throws InterruptedException { 
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
      new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES); 
    //ExecutorService service = Executors.newFixedThreadPool(1); 
    try { 
     final AtomicInteger counter = new AtomicInteger(); 
     for (long i = 0; i < 10000000; i++) { 
      service.submit(new Runnable() { 
       @Override 
       public void run() { 
        counter.incrementAndGet(); 
       } 
      }); 
      if (i % 10000 == 0) { 
       System.out.println(i + "/" + counter.get()); 
       while (i > counter.get()) { 
        Thread.sleep(10); 
       } 
      } 
     } 
    } finally { 
     service.shutdown(); 
    } 
} 

El programa agota la memoria disponible, aunque se espera que los generados Runnable s para completar.

Pensé en esto por un tiempo, pero lamentablemente no pude encontrar una buena solución.

EDIT: Me enteré de que este problema fue informado como JDK bug 6602600, y parece que se ha corregido recientemente.

1

Parece problema no está en el error de JDK 6.602.600 (que se resolvió en 2010-05-22), pero en llamada incorrecta de sueño (10) en el círculo. Además, note que el hilo principal debe dar directamente CHANCE a otros hilos para realizar sus tareas por invocación SLEEP (0) en CADA rama del círculo exterior. Es mejor, creo, utilizar Thread.yield() en lugar de Thread.sueño (0)

El resultado corregido parte de código de problema anterior es tal como esto:

....................... 
........................ 
Thread.yield();   

if (i % 1000== 0) { 
System.out.println(i + "/" + counter.get()+ "/"+service.toString()); 
} 

//     
//    while (i > counter.get()) { 
//     Thread.sleep(10); 
//    } 

funciona correctamente con la cantidad de contador exterior hasta 150 000 000 círculos ensayadas.

0

¿Qué pasa con esta idea alternativa:

  • dos tienen dos ejecutores:
    • uno para:
      • la presentación de la tarea, sin preocuparse por el tiempo de espera de la tarea
      • añadiendo el futuro resultó y el momento en que debería terminar en una estructura interna
    • uno para ejecutar un trabajo interno que comprueba la estructura interna si algunas tareas tienen un tiempo de espera y si deben cancelarse.

Pequeña muestra está aquí:

public class AlternativeExecutorService 
{ 

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue  = new CopyOnWriteArrayList(); 
private final ScheduledThreadPoolExecutor    scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job 
private final ListeningExecutorService     threadExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for 
private ScheduledFuture scheduledFuture; 
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L; 

public AlternativeExecutorService() 
{ 
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS); 
} 

public void pushTask(OwnTask task) 
{ 
    ListenableFuture<Void> future = threadExecutor.submit(task); // -> create your Callable 
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end 
} 

public void shutdownInternalScheduledExecutor() 
{ 
    scheduledFuture.cancel(true); 
    scheduledExecutor.shutdownNow(); 
} 

long getCurrentMillisecondsTime() 
{ 
    return Calendar.getInstance().get(Calendar.MILLISECOND); 
} 

class ListenableFutureTask 
{ 
    private final ListenableFuture<Void> future; 
    private final OwnTask    task; 
    private final long     milliSecEndTime; 

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime) 
    { 
     this.future = future; 
     this.task = task; 
     this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS); 
    } 

    ListenableFuture<Void> getFuture() 
    { 
     return future; 
    } 

    OwnTask getTask() 
    { 
     return task; 
    } 

    long getMilliSecEndTime() 
    { 
     return milliSecEndTime; 
    } 
} 

class TimeoutManagerJob implements Runnable 
{ 
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList() 
    { 
     return futureQueue; 
    } 

    @Override 
    public void run() 
    { 
     long currentMileSecValue = getCurrentMillisecondsTime(); 
     for (ListenableFutureTask futureTask : futureQueue) 
     { 
      consumeFuture(futureTask, currentMileSecValue); 
     } 
    } 

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue) 
    { 
     ListenableFuture<Void> future = futureTask.getFuture(); 
     boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue; 
     if (isTimeout) 
     { 
      if (!future.isDone()) 
      { 
       future.cancel(true); 
      } 
      futureQueue.remove(futureTask); 
     } 
    } 
} 

class OwnTask implements Callable<Void> 
{ 
    private long  timeoutDuration; 
    private TimeUnit timeUnit; 

    OwnTask(long timeoutDuration, TimeUnit timeUnit) 
    { 
     this.timeoutDuration = timeoutDuration; 
     this.timeUnit = timeUnit; 
    } 

    @Override 
    public Void call() throws Exception 
    { 
     // do logic 
     return null; 
    } 

    public long getTimeoutDuration() 
    { 
     return timeoutDuration; 
    } 

    public TimeUnit getTimeUnit() 
    { 
     return timeUnit; 
    } 
} 
} 
1

Después montón de tiempo a la encuesta,
Por último, utilizo invokeAll método de ExecutorService para resolver este problema.
Eso interrumpirá estrictamente la tarea mientras se ejecuta la tarea. Aquí es
ejemplo

ExecutorService executorService = Executors.newCachedThreadPool(); 

try { 
    List<Callable<Object>> callables = new ArrayList<>(); 
    // Add your long time task (callable) 
    callables.add(new VaryLongTimeTask()); 
    // Assign tasks for specific execution timeout (e.g. 2 sec) 
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS); 
    for (Future<Object> future : futures) { 
     // Getting result 
    } 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} 

executorService.shutdown(); 

El pro es que también puede presentar ListenableFuture al mismo ExecutorService.
Simplemente cambie ligeramente la primera línea de código.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 

ListeningExecutorService es la función de audición de ExecutorService al proyecto de Google de guayaba (com.google.guava))

+1

Gracias por señalar 'invokeAll'. Eso funciona muy bien Solo una palabra de advertencia para cualquiera que esté pensando en usar esto: aunque 'invokeAll' devuelve una lista de objetos' Future', en realidad parece ser una operación de bloqueo. – mxro

1

Usando John W respuesta creé una implementación que comienzan correctamente el tiempo de espera cuando la tarea se inicia su ejecución. Incluso escribo una prueba unitaria :)

Sin embargo, no se ajusta a mis necesidades, ya que algunas operaciones IO no interrumpen cuando se llama a Future.cancel() (es decir, cuando se llama al Thread.interrupted()).

De todos modos, si alguien está interesado, he creado una esencia: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf

Cuestiones relacionadas