2010-07-15 18 views
8

Hay una gran cantidad de tareas. Cada tarea pertenece a un solo grupo. El requisito es que cada grupo de tareas se ejecute en serie como si se ejecutara en un único subproceso y el rendimiento se maximizara en un entorno multi-core (o multi-cpu). Nota: también hay una gran cantidad de grupos que es proporcional al número de tareas.¿Qué ThreadPool en Java debería usar?

La solución ingenua utiliza ThreadPoolExecutor y sincroniza (o bloquea). Sin embargo, los hilos se bloquearían entre sí y el rendimiento no se maximizaría.

¿Alguna idea mejor? ¿O existe una biblioteca de terceros que satisfaga el requisito?

+2

"Sin embargo, los hilos podría bloquear entre sí y el rendimiento no está maximizada.". ¿Quiere decir que las tareas individuales acceden a una estructura o recurso de datos compartidos y esta es la causa de la disputa? – Adamski

+0

¿Conoces todas las tareas de un grupo por adelantado? Esto es importante al elegir una solución (colas versus sin colas) –

Respuesta

3

Un enfoque simple sería la de "concatenar" todas las tareas de grupo en una tarea súper, con lo que las sub-tareas se ejecutan en serie. Pero esto probablemente causará demoras en otros grupos que no se iniciarán a menos que algún otro grupo termine por completo y deje espacio en el grupo de subprocesos.

Como alternativa, considere encadenar las tareas de un grupo. El código siguiente ilustra que:

public class MultiSerialExecutor { 
    private final ExecutorService executor; 

    public MultiSerialExecutor(int maxNumThreads) { 
     executor = Executors.newFixedThreadPool(maxNumThreads); 
    } 

    public void addTaskSequence(List<Runnable> tasks) { 
     executor.execute(new TaskChain(tasks)); 
    } 

    private void shutdown() { 
     executor.shutdown(); 
    } 

    private class TaskChain implements Runnable { 
     private List<Runnable> seq; 
     private int ind; 

     public TaskChain(List<Runnable> seq) { 
      this.seq = seq; 
     } 

     @Override 
     public void run() { 
      seq.get(ind++).run(); //NOTE: No special error handling 
      if (ind < seq.size()) 
       executor.execute(this); 
     }  
    } 

La ventaja es que no se utiliza ninguna de recursos extra (hilo/cola), y que la granularidad de tareas es mejor que la del enfoque ingenuo. La desventaja es que todas las tareas del grupo deben conocerse por adelantado.

--edit--

Para hacer esta solución genérica y completa, es posible que desee para decidir sobre el manejo de errores (es decir, si una cadena se mantiene aunque occures un error), y también sería una buena idea para implementar ExecutorService y delegar todas las llamadas al ejecutor subyacente.

+0

¡Solución inteligente! +1 –

+0

Me gusta esta solución. – James

+0

Quizás también deberíamos agregar un mapa para que podamos encontrar el TaskChain de una Tarea especificada y agregarlo a su TaskChain. – James

2

sugeriría a utilizar las colas de tareas:

  • Por cada grupo de tareas que tiene crear una cola e insertar todas las tareas de ese grupo en ella.
  • Ahora todas sus colas se pueden ejecutar en paralelo, mientras que las tareas dentro de una cola se ejecutan en serie.

Una búsqueda rápida en Google sugiere que la API java no tiene tareas/colas de hilos por sí misma. Sin embargo, hay muchos tutoriales disponibles para codificar uno. Todos ustedes pueden enumerar buenos tutoriales/implementaciones si conocen alguno:

+0

Gracias Dave. Si hay una gran cantidad de grupos, el número de subprocesos llegará al límite. – James

+0

@James No necesariamente. Solo porque tenga n grupos no significa que necesite crear n hilos para ejecutarlos. Solo crea tantos hilos como creas que sean adecuados y ellos se encargarán de las colas, ya sea en forma de turno rotativo o en serie. –

1

Estoy de acuerdo en la respuesta de Dave, pero si necesita cortar el tiempo de CPU en todos los "grupos", es decir, todos los grupos de tareas deben progresar en paralelo, es posible que encontrar este tipo de construcción útil (. mediante la eliminación de "bloquear" esto funcionó bien en mi caso aunque imagino que tiende a utilizar más memoria):

class TaskAllocator { 
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork 
     = childQueuePerTaskGroup(); 

    public Queue<Runnable> lockTaskGroup(){ 
     return entireWork.poll(); 
    } 

    public void release(Queue<Runnable> taskGroup){ 
     entireWork.offer(taskGroup); 
    } 
} 

y

class DoWork implmements Runnable { 
    private final TaskAllocator allocator; 

    public DoWork(TaskAllocator allocator){ 
     this.allocator = allocator; 
    } 

    pubic void run(){ 
     for(;;){ 
      Queue<Runnable> taskGroup = allocator.lockTaskGroup(); 
      if(task==null){ 
       //No more work 
       return; 
      } 
      Runnable work = taskGroup.poll(); 
      if(work == null){ 
       //This group is done 
       continue; 
      } 

      //Do work, but never forget to release the group to 
      // the allocator. 
      try { 
       work.run(); 
      } finally { 
       allocator.release(taskGroup); 
      } 
     }//for 
    } 
} 

a continuación, puede utilizar número óptimo de hilos para ejecutar DoWork tarea. Es una especie de un equilibrio de carga por turnos ..

Incluso se puede hacer algo más sofisticado, utilizando esta vez de un simple cola en TaskAllocator (grupos de trabajo, con más tarea pendiente tienden a quedar ejecutada)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator()); 

donde SophisticatedComparator es

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> { 
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){ 
     int diff = o2.size() - o1.size(); 
     if(diff==0){ 
      //This is crucial. You must assign unique ids to your 
      //Subqueue and break the equality if they happen to have same size. 
      //Otherwise your queues will disappear... 
      return o1.id - o2.id; 
     } 
     return diff; 
    } 
} 
+1

las colas de tareas +1 le permiten usar cualquier algoritmo de programación que se adapte a sus necesidades. –

+0

Parece que está volviendo a implementar un grupo de subprocesos. ¿Por qué no utilizar el ThreadPoolExecutor estándar más alguna funcionalidad adicional como en mi solución? Mi solución no requiere colas ni sincronización. –

+0

@Eyal: si está bien consumir grupos de tareas secuencialmente, estoy de acuerdo con usted. Sin embargo, si tienen que consumirse en paralelo, esto es necesario. –

0

Actor es también otra solución para este tipo específico de problemas. Scala tiene actores y también Java, proporcionado por AKKA.

-2

Tuve un problema similar al tuyo, y utilicé un ExecutorCompletionService que funciona con Executor para completar las colecciones de tareas. Aquí es un extracto de la API java.util.concurrent, ya Java7:

Suponga que tiene un conjunto de solucionadores para un determinado problema, cada uno de devolver un valor de algún tipo de resultado, y me gustaría ejecutar concurrentemente , procesando los resultados de cada uno de ellos que devuelven un valor no nulo, en algún uso del método (Resultado r). Se puede escribir esto como:

void solve(Executor e, Collection<Callable<Result>> solvers) 
     throws InterruptedException, ExecutionException { 
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e); 
    for (Callable<Result> s : solvers) 
     ecs.submit(s); 
    int n = solvers.size(); 
    for (int i = 0; i < n; ++i) { 
     Result r = ecs.take().get(); 
     if (r != null) 
      use(r); 
    } 
} 

Así, en su caso, todas las tareas será una sola Callable<Result>, y las tareas se agruparán en un Collection<Callable<Result>>.

Referencia: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html