2009-09-12 21 views
5

Tengo una cola de tareas en Java. Esta cola está en una tabla en el DB.Cómo administrar subprocesos M (1 por tarea) asegurando solo N subprocesos al mismo tiempo. Con N <M. En Java

I necesidad de:

  • 1 hilo por tarea sólo
  • No más de hilos N se ejecutan al mismo tiempo. Esto se debe a que los hilos tienen interacción DB y no quiero tener un grupo de conexiones DB abiertas.

creo que podría hacer algo como:

final Semaphore semaphore = new Semaphore(N); 
while (isOnJob) { 
    List<JobTask> tasks = getJobTasks(); 
    if (!tasks.isEmpty()) { 
     final CountDownLatch cdl = new CountDownLatch(tasks.size()); 
     for (final JobTask task : tasks) { 
      Thread tr = new Thread(new Runnable() { 

       @Override 
       public void run() { 
        semaphore.acquire(); 
        task.doWork(); 
        semaphore.release(); 
        cdl.countDown(); 
       } 

      }); 
     } 
     cdl.await(); 
    } 
} 

sé que existe una clase ExecutorService, pero no estoy seguro si es que se puede utilizar para esto.

Entonces, ¿crees que esta es la mejor manera de hacerlo? ¿O podría aclararme cómo funciona ExecutorService para resolver esto?

solución final:

creo que la mejor solución es algo así como:

while (isOnJob) { 
    ExecutorService executor = Executors.newFixedThreadPool(N); 
    List<JobTask> tasks = getJobTasks(); 
    if (!tasks.isEmpty()) { 
     for (final JobTask task : tasks) { 
      executor.submit(new Runnable() { 

       @Override 
       public void run() { 
        task.doWork(); 
       } 

      }); 
     } 
    } 
    executor.shutdown(); 
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS); 
} 

Gracias mucho por los awnsers. Por cierto, estoy usando un grupo de conexiones, pero las consultas a la base de datos son muy pesadas y no quiero tener un número incontrolado de tareas al mismo tiempo.

Respuesta

7

De hecho, puede utilizar un ExecutorService. Por ejemplo, cree un nuevo grupo de subprocesos fijos utilizando el método newFixedThreadPool. De esta manera, además de los hilos de caché, también garantiza que no se están ejecutando más de n hilos al mismo tiempo.

Algo a lo largo de estas líneas:

private static final ExecutorService executor = Executors.newFixedThreadPool(N); 
// ... 
while (isOnJob) { 
    List<JobTask> tasks = getJobTasks(); 
    if (!tasks.isEmpty()) { 
     List<Future<?>> futures = new ArrayList<Future<?>>(); 
     for (final JobTask task : tasks) { 
       Future<?> future = executor.submit(new Runnable() {  
         @Override 
         public void run() { 
           task.doWork(); 
         } 
       }); 
       futures.add(future); 
     } 
     // you no longer need to use await 
     for (Future<?> fut : futures) { 
      fut.get(); 
     } 
    } 
} 

Tenga en cuenta que ya no es necesario utilizar el pestillo, como get esperará a que el cálculo para completar, si es necesario.

+0

Así que parece que tampoco necesito el semáforo, ¿verdad? – user2427

+0

Sí, también puedes eliminarlo. –

0

Lograr un buen rendimiento también depende del tipo de trabajo que se necesita realizar en los hilos. Si su DB es el cuello de botella en el procesamiento, comenzaría a prestar atención a cómo sus subprocesos acceden a la base de datos. El uso de un grupo de conexión probablemente esté en orden. Esto podría ayudarlo a obtener más rendimiento, ya que los subprocesos de trabajo pueden volver a utilizar las conexiones de base de datos desde el grupo.

4

Estoy de acuerdo con JG que ExecutorService es el camino a seguir ... pero creo que ambos lo están haciendo más complicado de lo necesario.

En lugar de crear un gran número de hilos (1 por tarea) ¿por qué no crear un conjunto fijo de hilo de tamaño (con Executors.newFixedThreadPool(N)) y presentar todas las tareas en él? No hay necesidad de un semáforo ni nada por el estilo; solo envíe los trabajos al grupo de subprocesos a medida que los obtiene, y el grupo de subprocesos los manejará con hasta N subprocesos a la vez.

Si no va a utilizar más de N subprocesos a la vez, ¿por qué desea crearlos?

1

Utilice un ThreadPoolExecutor ejemplo, con una cola no unido y tamaño máximo fijo de los hilos, por ejemplo, Executors.newFixedThreadPool (N). Esto aceptará una gran cantidad de tareas, pero solo ejecutará N de forma concurrente.

Si elige una cola acotada en su lugar (con una capacidad de N) la Ejecutor rechazará la ejecución de la tarea (¿cómo es exactamente depende de la política se puede configurar cuando se trabaja con ThreadPoolExecutor directamente, en vez de utilizar Ejecutores fábrica - ver RejectedExecutionHandler).

Si necesita "real" control de congestión se debe configurar un límite BlockingQueue con una capacidad de N. Obtenga las tareas que desea hacer desde la base de datos y ponga en la cola; si está llena, el hilo de llamada se bloqueará. En otro hilo (quizás también comenzó a usar el Ejecutor API) que toma tareas de la BlockingQueue y someterlos a la Ejecutor. Si BlockingQueue está vacío, el hilo de llamada también se bloqueará. Para indicar que ha terminado, use un objeto "especial" (por ejemplo, un singleton que marque el último/último elemento en la cola).

Cuestiones relacionadas