2010-01-06 21 views
24

Decir que tengo una tarea como:¿Cuál es la forma más fácil de paralelizar una tarea en Java?

for(Object object: objects) { 
    Result result = compute(objects); 
    list.add(result); 
} 

¿Cuál es la forma más fácil de poner en paralelo cada uno de cómputo() (suponiendo que ya son paralelizable)?

No necesito una respuesta que coincida estrictamente con el código anterior, solo una respuesta general. Pero si necesita más información: mis tareas están vinculadas a IO y esto es para una aplicación web de Spring y las tareas se ejecutarán en una solicitud HTTP.

+4

¿Debería la segunda línea ser 'Result result = compute (object);'? – Carcigenicate

Respuesta

40

Recomendaría echar un vistazo a ExecutorService.

En particular, algo como esto:

ExecutorService EXEC = Executors.newCachedThreadPool(); 
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); 
for (final Object object: objects) { 
    Callable<Result> c = new Callable<Result>() { 
     @Override 
     public Result call() throws Exception { 
      return compute(object); 
     } 
    }; 
    tasks.add(c); 
} 
List<Future<Result>> results = EXEC.invokeAll(tasks); 

Tenga en cuenta que el uso de newCachedThreadPool podría ser malo si objects es una lista grande. ¡Un grupo de subprocesos en caché podría crear un subproceso por tarea! Es posible que desee utilizar newFixedThreadPool(n) donde n es algo razonable (como la cantidad de núcleos que tiene, suponiendo que compute() está vinculado a la CPU).

Aquí es código completo que realmente ejecuta:

import java.util.ArrayList; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 

public class ExecutorServiceExample { 
    private static final Random PRNG = new Random(); 

    private static class Result { 
     private final int wait; 
     public Result(int code) { 
      this.wait = code; 
     } 
    } 

    public static Result compute(Object obj) throws InterruptedException { 
     int wait = PRNG.nextInt(3000); 
     Thread.sleep(wait); 
     return new Result(wait); 
    } 

    public static void main(String[] args) throws InterruptedException, 
     ExecutionException { 
     List<Object> objects = new ArrayList<Object>(); 
     for (int i = 0; i < 100; i++) { 
      objects.add(new Object()); 
     } 

     List<Callable<Result>> tasks = new ArrayList<Callable<Result>>(); 
     for (final Object object : objects) { 
      Callable<Result> c = new Callable<Result>() { 
       @Override 
       public Result call() throws Exception { 
        return compute(object); 
       } 
      }; 
      tasks.add(c); 
     } 

     ExecutorService exec = Executors.newCachedThreadPool(); 
     // some other exectuors you could try to see the different behaviours 
     // ExecutorService exec = Executors.newFixedThreadPool(3); 
     // ExecutorService exec = Executors.newSingleThreadExecutor(); 
     try { 
      long start = System.currentTimeMillis(); 
      List<Future<Result>> results = exec.invokeAll(tasks); 
      int sum = 0; 
      for (Future<Result> fr : results) { 
       sum += fr.get().wait; 
       System.out.println(String.format("Task waited %d ms", 
        fr.get().wait)); 
      } 
      long elapsed = System.currentTimeMillis() - start; 
      System.out.println(String.format("Elapsed time: %d ms", elapsed)); 
      System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum/(elapsed * 1d))); 
     } finally { 
      exec.shutdown(); 
     } 
    } 
} 
+0

¿Hay una versión C# de esto? – Malfist

+1

También consulte a los ejecutores, que funciona como una fábrica para varios tipos de servicios de ejecutor. –

+0

@Malfist en C# hay tareas (bueno para el próximo .net 4) que hace que todo esto sea muy sencillo :). Y hay delegados/lambdas y threads, funcs, threadstart, etc. para hacerlo en 3.5 –

0

Uno puede crear un hilo y obtener el resultado.

Thread t = new Mythread(object); 

if (t.done()) { 
    // get result 
    // add result 
} 

EDIT: Creo que otras soluciones son más frías.

0

I iba a mencionar una clase ejecutor. Aquí hay un código de ejemplo que colocarías en la clase ejecutor.

private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4); 

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>(); 

    public void addCallable(Callable<Object> callable) { 
     this.callableList.add(callable); 
    } 

    public void clearCallables(){ 
     this.callableList.clear(); 
    } 

    public void executeThreads(){ 
     try { 
     threadLauncher.invokeAll(this.callableList); 
     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

    public Object[] getResult() { 

     List<Future<Object>> resultList = null; 
     Object[] resultArray = null; 
     try { 

      resultList = threadLauncher.invokeAll(this.callableList); 

      resultArray = new Object[resultList.size()]; 

      for (int i = 0; i < resultList.size(); i++) { 
       resultArray[i] = resultList.get(i).get(); 
      } 

     } catch (Exception e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     return resultArray; 
    } 

Luego de usar que le hacer llamadas a la clase ejecutor para poblar y ejecutarlo.

executor.addCallable(some implementation of callable) // do this once for each task 
Object[] results = executor.getResult(); 
+0

Siempre me molesta que no haya clases de contenedor para un conjunto de trabajos –

1

Aquí hay algo que utilizo en mis propios proyectos:

public class ParallelTasks 
{ 
    private final Collection<Runnable> tasks = new ArrayList<Runnable>(); 

    public ParallelTasks() 
    { 
    } 

    public void add(final Runnable task) 
    { 
     tasks.add(task); 
    } 

    public void go() throws InterruptedException 
    { 
     final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime() 
       .availableProcessors()); 
     try 
     { 
      final CountDownLatch latch = new CountDownLatch(tasks.size()); 
      for (final Runnable task : tasks) 
       threads.execute(new Runnable() { 
        public void run() 
        { 
         try 
         { 
          task.run(); 
         } 
         finally 
         { 
          latch.countDown(); 
         } 
        } 
       }); 
      latch.await(); 
     } 
     finally 
     { 
      threads.shutdown(); 
     } 
    } 
} 

// ... 

public static void main(final String[] args) throws Exception 
{ 
    ParallelTasks tasks = new ParallelTasks(); 
    final Runnable waitOneSecond = new Runnable() { 
     public void run() 
     { 
      try 
      { 
       Thread.sleep(1000); 
      } 
      catch (InterruptedException e) 
      { 
      } 
     } 
    }; 
    tasks.add(waitOneSecond); 
    tasks.add(waitOneSecond); 
    tasks.add(waitOneSecond); 
    tasks.add(waitOneSecond); 
    final long start = System.currentTimeMillis(); 
    tasks.go(); 
    System.err.println(System.currentTimeMillis() - start); 
} 

que imprime un poco más de 2.000 en mi caja de doble núcleo.

Cuestiones relacionadas