2010-02-11 9 views
22

Estoy usando ExecutorService para facilitar el programa multiproceso concurrente. Tomar siguiente código:ExecutorService, forma estándar para evitar que la cola de tareas se llene demasiado

 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
Future<..> ... = exService.submit(..); 
... 
} 
 

En mi caso, el problema es que enviar() no se bloquea si están ocupados todos los NUMBER_THREADS. La consecuencia es que la cola de Tareas se inunda con muchas tareas. La consecuencia de esto es que el cierre del servicio de ejecución con ExecutorService.shutdown() lleva años (ExecutorService.isTerminated() será falso durante mucho tiempo). La razón es que la cola de tareas todavía está bastante llena.

Por ahora mi solución es trabajar con semáforos para no permitir a tener que muchas entradas dentro de la cola de tareas de ExecutorService:

 

... 
Semaphore semaphore=new Semaphore(NUMBER_THREADS); 

while(xxx) 
ExecutorService exService = Executors.newFixedThreadPool(NUMBER_THREADS); 
... 
semaphore.aquire(); 
// internally the task calls a finish callback, which invokes semaphore.release() 
// -> now another task is added to queue 
Future<..> ... = exService.submit(..); 
... 
} 
 

Estoy seguro de que hay una solución mejor y más encapsulado?

Respuesta

4

Es mejor que usted mismo cree el ThreadPoolExecutor (que es lo que Executors.newXXX() hace de todos modos).

En el constructor, puede pasar un BlockingQueue para que lo ejecute el Ejecutor como cola de tareas. Si transfiere un BlockingQueue con restricciones de tamaño (como LinkedBlockingQueue), debe lograr el efecto que desee.

ExecutorService exService = new ThreadPoolExecutor(NUMBER_THREADS, NUMBER_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(workQueueSize)); 
+0

veo. pasé por alto un detalle en http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool%28int%29. allí se menciona que, como estándar, se utiliza una cola sin límites. –

+6

lo probé. desafortunadamente, su solución no se bloquea (como yo quiero) sino que arroja una excepción RejectedExecutionException. también encontrado: http://www.velocityreviews.com/forums/t389526-threadpoolexecutor-with-blocking-execute.html. las soluciones presentadas parecen ser más complicadas que mi ejemplo de semáforo, ¡maldita sea! –

+3

esto no funciona debido a RejectedExecutionException si la cola está llena – user249654

5

Puede ThreadPoolExecutor.getQueue(). Size() para averiguar el tamaño de la cola de espera. Puede realizar una acción si la cola es demasiado larga. Sugiero ejecutar la tarea en el hilo actual si la cola es demasiado larga para ralentizar al productor (si es apropiado)

4

Un bloqueo real ThreadPoolExecutor ha estado en la lista de deseos de muchos, incluso hay un error JDC abierto en él . que estoy enfrentando el mismo problema, y ​​me encontré con esto: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html

Es una implementación de un BlockingThreadPoolExecutor, implementado usando un RejectionPolicy que utiliza oferta para agregar la tarea a la cola, a la espera de la cola para tener espacio. Se ve bien.

+0

[Esta respuesta] (http://stackoverflow.com/a/4522411/394431) a otra pregunta sugiere usar una subclase 'BlockingQueue' personalizada que bloquea en' offer() 'delegando en' put() '. Creo que termina trabajando más o menos igual que este 'RejectedExecutionHandler'. –

1

puede agregar otra cola de bloquing que tiene un tamaño limitado para controlar el tamaño de la cola interna en el ejecutorService, algunos piensan como un semáforo pero es muy fácil. antes del ejecutor que pones() y cuando la tarea se realiza(). tomar() debe ser dentro del código tarea

21

El truco es usar un tamaño de cola fija y:

new ThreadPoolExecutor.CallerRunsPolicy() 

También se recomienda el uso de guayaba ListeningExecutorService. Aquí hay un ejemplo de colas consumidor/productor.

private ListeningExecutorService producerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 
private ListeningExecutorService consumerExecutorService = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20)); 

private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 
            5000L, TimeUnit.MILLISECONDS, 
            new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy()); 
} 

Cualquier cosa mejor y es posible que desee considerar un MQ como RabbitMQ o ActiveMQ ya que tienen la tecnología QoS.

+0

Gran respuesta, con un ejemplo completo. –

Cuestiones relacionadas