2010-08-10 33 views
42

Estoy tratando de ejecutar muchas tareas usando un ThreadPoolExecutor. A continuación se muestra un ejemplo hipotético:Bloque ThreadPoolExecutor cuando la cola está llena?

def workQueue = new ArrayBlockingQueue<Runnable>(3, false) 
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) 
for(int i = 0; i < 100000; i++) 
    threadPoolExecutor.execute(runnable) 

El problema es que rápidamente consigo un java.util.concurrent.RejectedExecutionException ya que el número de tareas supera el tamaño de la cola de trabajo. Sin embargo, el comportamiento deseado que estoy buscando es tener el bloque de hilos principal hasta que haya espacio en la cola. Cuál es la mejor manera de lograr esto?

+2

Eche un vistazo a esta pregunta: http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated – Kiril

+2

[Esta respuesta] (http : //stackoverflow.com/a/4522411/394431) a otra pregunta sugiere usar una subclase 'BlockingQueue' personalizada que bloquea en' offer() 'delegando en' put() '. Creo que termina trabajando más o menos igual que el 'RejectedExecutionHandler' que llama a' getQueue(). Put() '. –

+2

Poner directamente en cola sería incorrecto, como se explica en esta respuesta http://stackoverflow.com/a/3518588/585903 –

Respuesta

45

En algunas circunstancias muy limitadas, puede implementar un java.util.concurrent.RejectedExecutionHandler que hace lo que necesita.

RejectedExecutionHandler block = new RejectedExecutionHandler() { 
    rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    executor.getQueue().put(r); 
    } 
}; 

ThreadPoolExecutor pool = new ... 
pool.setRejectedExecutionHandler(block); 

Ahora. Esta es una muy mala idea por las siguientes razones

  • Es propenso a un punto muerto, porque todos los hilos de la piscina pueden morir antes de lo que usted pone en la cola es visible. Mitigue esto estableciendo un tiempo razonable de mantener vivo.
  • La tarea no está envuelta de la manera que su ejecutor puede esperar. Muchas implementaciones de ejecutores envuelven sus tareas en algún tipo de objeto de seguimiento antes de la ejecución. Mira la fuente de los tuyos.
  • Agregar mediante getQueue() es desaconsejado por la API, y puede estar prohibido en algún momento.

Una estrategia casi siempre mejor es instalar ThreadPoolExecutor.CallerRunsPolicy que estrangulará tu aplicación ejecutando la tarea en el hilo que llama a execute().

Sin embargo, a veces una estrategia de bloqueo, con todos sus riesgos inherentes, es realmente lo que desea. Yo diría que en estas condiciones

  • Sólo tiene un hilo de llamar a execute()
  • Tienes que (o quiere) tienen una muy pequeña longitud de la cola
  • Es absolutamente necesario limitar el número de los hilos que ejecutan este trabajo (generalmente por razones externas), y una estrategia de ejecución de la llamada rompería eso.
  • Sus tareas son de un tamaño impredecible, por lo que las llamadas pueden causar inanición si el grupo estuvo momentáneamente ocupado con 4 tareas cortas y su ejecución de llamada de un hilo quedó atascada con una gran.

Así que, como digo. Rara vez se necesita y puede ser peligroso, pero ahí lo tienes.

Buena suerte.

+2

Una respuesta muy bien pensada. Tomo un pequeño inconveniente con su condición que> "Tiene que (o desea) tener una longitud de cola muy pequeña". Es posible que no pueda predecir cuántas tareas pondrá en cola un trabajo determinado. Tal vez está ejecutando un trabajo diario que procesa datos de algún DB y el lunes hay 500 registros para procesar, pero el martes hay 50,000. Tienes que establecer un límite superior en tu cola para que no explotes cuando llegue un gran trabajo. En ese caso, no hay inconveniente en esperar que se completen algunas tareas antes de hacer más cola. – skelly

+0

Fantástica respuesta, gracias –

+0

"Es propenso a un punto muerto porque todos los hilos en la agrupación pueden morir antes de que lo que colocaste en la cola sea visible. Mitiga esto estableciendo un tiempo razonable de mantener vivo". ¿No se puede evitar el interbloqueo estableciendo el tamaño del grupo mínimo a un valor mayor que cero? Cualquier otro motivo es que Java no tiene soporte integrado para bloquear las colas de los ejecutores. Lo cual es interesante, porque parece ser una estrategia bastante razonable. Me pregunto cuál es el razonamiento. –

1

Aquí es mi fragmento de código en este caso:

public void executeBlocking(Runnable command) { 
    if (threadPool == null) { 
     logger.error("Thread pool '{}' not initialized.", threadPoolName); 
     return; 
    } 
    ThreadPool threadPoolMonitor = this; 
    boolean accepted = false; 
    do { 
     try { 
      threadPool.execute(new Runnable() { 
       @Override 
       public void run() { 
        try { 
         command.run(); 
        } 
        // to make sure that the monitor is freed on exit 
        finally { 
         // Notify all the threads waiting for the resource, if any. 
         synchronized (threadPoolMonitor) { 
          threadPoolMonitor.notifyAll(); 
         } 
        } 
       } 
      }); 
      accepted = true; 
     } 
     catch (RejectedExecutionException e) { 
      // Thread pool is full 
      try { 
       // Block until one of the threads finishes its job and exits. 
       synchronized (threadPoolMonitor) { 
        threadPoolMonitor.wait(); 
       } 
      } 
      catch (InterruptedException ignored) { 
       // return immediately 
       break; 
      } 
     } 
    } while (!accepted); 
} 

ThreadPool es una instancia local de java.util.concurrent.ExecutorService que se ha inicializado ya.

0

He resuelto este problema utilizando una costumbre RejectedExecutionHandler, que simplemente bloquea el subproceso de llamada por un tiempo y luego intenta enviar de nuevo la tarea:

public class BlockWhenQueueFull implements RejectedExecutionHandler { 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 

     // The pool is full. Wait, then try again. 
     try { 
      long waitMs = 250; 
      Thread.sleep(waitMs); 
     } catch (InterruptedException interruptedException) {} 

     executor.execute(r); 
    } 
} 

Esta clase sólo se puede utilizar en el hilo de la piscina ejecutor como RejectedExecutionHandler como cualquier otro. En este ejemplo:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull()) 

El único inconveniente que veo es que el subproceso de la llamada puede ser que consiga bloqueado un poco más largo de lo estrictamente necesario (hasta 250 ms). Para muchas tareas de ejecución breve, quizás disminuya el tiempo de espera a 10 ms o menos. Además, dado que este ejecutor se está llamando efectivamente de forma recursiva, esperar mucho tiempo para que un hilo esté disponible (horas) puede provocar un desbordamiento de la pila.

Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien. ¿Me estoy perdiendo algo importante?

Cuestiones relacionadas