2012-06-28 21 views
11

Necesito implementar un grupo de subprocesos en Java (java.util.concurrent) cuyo número de subprocesos tiene un valor mínimo cuando está inactivo, crece hasta un límite superior (pero nunca Además, cuando los trabajos se envían más rápido de lo que terminan de ejecutarse, y se reduce al límite inferior cuando se realizan todos los trabajos y no se envían más trabajos.Creación de un grupo de subprocesos dinámico (creciente/decreciente)

¿Cómo implementarías algo así? Imagino que este sería un escenario de uso bastante común, pero aparentemente los métodos de fábrica java.util.concurrent.Executors solo pueden crear pools y pools de tamaño fijo que crecen sin límites cuando se envían muchos trabajos. La clase ThreadPoolExecutor proporciona los parámetros corePoolSize y maximumPoolSize, pero su documentación parece implicar que la única manera de tener más de corePoolSize hilos al mismo tiempo es usar una cola de trabajos acotada, en cuyo caso, si ha alcanzado los hilos maximumPoolSize, ¿Obtendrá rechazos de trabajo que tiene que tratar usted mismo? Se me ocurrió esto:

//pool creation 
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS, 
    new ArrayBlockingQueue<Runnable>(minSize)); 
... 

//submitting jobs 
for (Runnable job : ...) { 
    while (true) { 
     try { 
      pool.submit(job); 
      System.out.println("Job " + job + ": submitted"); 
      break; 
     } catch (RejectedExecutionException e) { 
      // maxSize jobs executing concurrently atm.; re-submit new job after short wait 
      System.out.println("Job " + job + ": rejected..."); 
      try { 
       Thread.sleep(300); 
      } catch (InterruptedException e1) { 
      } 
     } 
    } 
} 

¿Estoy pasando por alto algo? ¿Hay una mejor manera de hacer esto? Además, dependiendo de los requisitos, puede ser problemático que el código anterior no termine hasta que al menos (creo) (total number of jobs) - maxSize trabajos hayan terminado. Entonces, si desea poder enviar un número arbitrario de trabajos al grupo y proceder de inmediato sin esperar a que termine ninguno de ellos, no veo cómo podría hacerlo sin tener un hilo de "sumisión de trabajo" dedicado que administre la cola ilimitada requerida para contener todos los trabajos enviados. AFAICS, si está utilizando una cola ilimitada para ThreadPoolExecutor, su recuento de hilos nunca crecerá más allá de corePoolSize.

+3

Debo admitir que no veo la utilidad de un threadpool de tamaño dinámico. ¿Su número de procesadores en su placa cambia durante el tiempo de actividad de su aplicación? – corsiKa

+3

¿Por qué 'newCachedThreadPool' no es adecuado para su situación? Elimina automáticamente los hilos que ya no se utilizan. – Tudor

+0

¿Qué pasaría si tus hilos inactivos no murieran? Supongamos que tiene un grupo de tamaño fijo del tamaño máximo todo el tiempo. ¿Qué pasaría? –

Respuesta

4

Un truco que podría ayudarte es asignar un RejectedExecutionHandler que use el mismo hilo para enviar el trabajo a la cola de bloqueo. Eso bloqueará el hilo actual y eliminará la necesidad de algún tipo de ciclo.

Véase mi respuesta aquí:

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

Aquí está el controlador de rechazo copiado de esa respuesta.

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200); 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 
     0L, TimeUnit.MILLISECONDS, queue); 
// by default (unfortunately) the ThreadPoolExecutor will call the rejected 
// handler when you submit the 201st job, to have it block you do: 
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     // this will block if the queue is full 
     executor.getQueue().put(r); 
    } 
}); 

A continuación, debería ser capaz de hacer uso del hilo de núcleo/máx contar el tiempo que se da cuenta de que la cola de bloqueo delimitada que utilice primera se llena antes de que los hilos se crean por encima de los hilos de núcleo. Entonces, si tiene 10 hilos principales y quiere que el 11 ° trabajo comience el 11 °, necesitará una cola de bloqueo con un tamaño de 0 desafortunadamente (tal vez un SynchronousQueue). Siento que esta es una limitación real en las clases por lo demás excelentes ExecutorService.

1

Conjunto maximumPoolSize a Integer.MAX_VALUE. Si alguna vez tiene más de 2 mil millones de hilos ... bueno, buena suerte con eso.

De todos modos, el Javadoc de ThreadPoolExecutor estados:

Al establecer maximumPoolSize a un valor esencialmente ilimitada como Integer.MAX_VALUE, que permita la piscina para dar cabida a un número arbitrario de tareas concurrentes. Lo más habitual es que los tamaños de grupo máximo y central se establezcan solo durante la construcción, pero también pueden cambiarse dinámicamente usando setCorePoolSize (int) y setMaximumPoolSize (int).

Con una cola de tareas igualmente ilimitada como LinkedBlockingQueue, esto debería tener una capacidad arbitrariamente grande.

+0

¿Le importaría explicar al infractor? –

+0

Gracias también refiérase a este http://stackoverflow.com/questions/28567238/threadpoolexecutor-does-not-shrink-properly/40384042#40384042 contracción de otros problemas resueltos en otra pregunta – Vahid

+0

que quería que fuera limitada, no ilimitada. .. – Xerus

8

Cuando crece y se contrae el hilo, solo hay un nombre que me viene a la mente: CachedThreadPool del paquete java.util.concurrent.

ExecutorService executor = Executors.newCachedThreadPool(); 

CachedThreadPool() puede reutilizar el hilo, así como crear nuevos temas cuando sea necesario. Y sí, si un subproceso está inactivo durante 60 segundos, CachedThreadPool lo matará. Así que esto es bastante liviano: ¡crece y se reduce en sus palabras!

+6

Correcto, pero no está limitado. – Gray

+0

Puede usar ThreadPoolExecutor subyacente y configurar maximumPoolSize manualmente o incluso en tiempo de ejecución –

Cuestiones relacionadas