2012-06-01 9 views
9

Necesito encontrar una forma de ejecutar tareas (dependientes e independientes) en paralelo en Java.Ejecutando tareas Dependientes en paralelo en Java

  1. La Tarea A y la Tarea C se pueden ejecutar de forma independiente.
  2. Tarea B depende de la salida de la tarea A.

he comprobado java.util.concurrent Futuro y Tenedor/Join, pero parece que no podemos añadir la dependencia a una tarea.

¿Alguien me puede indicar que corrija la API de Java?

+0

¿Ha considerado que la Tarea A notifique la Tarea B cuando esté completa? Antes de comenzar la Tarea A, crea una instancia de la Tarea B y agrégala como observador a la Tarea A (Ver Patrón del Observador). –

+0

Guava ['ListenableFuture'] (http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained) es un poco más amigable acerca de estas cosas que los futuros simples. –

Respuesta

0

Lo que se necesita es un CountDownLatch.

final CountDownLatch gate = new CountDownLatch(2); 
// thread a 
new Thread() { 
    public void run() { 
     // process 
     gate.countDown(); 
    } 
}.start(); 

// thread c 
new Thread() { 
    public void run() { 
     // process 
     gate.countDown(); 
    } 
}.start(); 

new Thread() { 
    public void run() { 
     try { 
      gate.await(); 
      // both thread a and thread c have completed 
      // process thread b 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

    } 
}.start(); 

Como alternativa, dependiendo de su escenario, es posible que también sea capaz de utilizar un BlockingQueue para implementar el patrón del productor-consumidor. Vea el ejemplo en la página de documentación.

+0

Un 'CountDownLatch' es excesivo aquí, y de acuerdo con el OP, la tarea B depende solo de la tarea A, no de las tareas A y C. Dicho esto, el -1 es simplemente para no manejar correctamente la' InterruptedException' en el fragmento. –

+0

Gracias, la idea del fragmento de código fue mostrarle cómo funciona CountDownLatch, no mostrarle cómo manejar las excepciones correctamente. – Jeshurun

0

Si la tarea B depende de la salida de la tarea de A, me gustaría primera cuestión, si la tarea B o en realidad no es una tarea independiente. La separación de las tareas tendría sentido si no existe:

  • Una cierta cantidad no trivial de trabajo que la tarea B puede hacer antes de tener los resultados de la tarea de un
  • tarea B es un proceso continuo de largo que se encarga de la producción de muchas instancias diferentes de la tarea a
  • Hay algunas otras tareas (dicen D) que también utilizan tarea da como resultado de a

Suponiendo que sea una tarea independiente, entonces se puede permitir que la tarea a & B a compartir una tarea un BlockingQueue tal que puede pasar los datos de la tarea B

10

En Scala esto es muy fácil de hacer, y creo que es mejor usar Scala. Aquí hay un ejemplo que extraje de aquí http://danielwestheide.com/ (Guía del Neófito de Scala Parte 16: Dónde ir desde aquí) este tipo tiene un gran blog (no soy ese tipo)

Tomemos un barrista haciendo café.Las tareas a realizar son:

  1. moler los granos requeridos café (no hay tareas anteriores)
  2. calor un poco de agua (no hay tareas anteriores)
  3. Brew un espresso con el café molido y el agua caliente (depende 1 & 2)
  4. espuma un poco de leche (no hay tareas anteriores)
  5. combinan la espuma de leche y el café (depende 3,4)

o como un árbol:

Grind _ 
Coffe \ 
      \ 
Heat ___\_Brew____ 
Water    \_____Combine 
        /
Foam ____________/ 
Milk 

en Java utilizando la API de concurrencia esto sería:

import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.FutureTask; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 

public class Barrista { 

    static class HeatWater implements Callable<String> { 
     @Override 
     public String call() throws Exception { 
      System.out.println("Heating Water"); 
      Thread.sleep(1000); 
      return "hot water"; 
     } 
    } 

    static class GrindBeans implements Callable<String> { 
     @Override 
     public String call() throws Exception { 
      System.out.println("Grinding Beans"); 
      Thread.sleep(2000); 
      return "grinded beans"; 
     } 
    } 

    static class Brew implements Callable<String> { 

     final Future<String> grindedBeans; 
     final Future<String> hotWater; 

     public Brew(Future<String> grindedBeans, Future<String> hotWater) { 
      this.grindedBeans = grindedBeans; 
      this.hotWater = hotWater; 
     } 

     @Override 
     public String call() throws Exception 
     { 
      System.out.println("brewing coffee with " + grindedBeans.get() 
        + " and " + hotWater.get()); 
      Thread.sleep(1000); 
      return "brewed coffee"; 
     } 
    } 

    static class FrothMilk implements Callable<String> { 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000); 
      return "some milk"; 
     } 
    } 

    static class Combine implements Callable<String> { 

     public Combine(Future<String> frothedMilk, Future<String> brewedCoffee) { 
      super(); 
      this.frothedMilk = frothedMilk; 
      this.brewedCoffee = brewedCoffee; 
     } 

     final Future<String> frothedMilk; 
     final Future<String> brewedCoffee; 

     @Override 
     public String call() throws Exception { 
      Thread.sleep(1000); 
      System.out.println("Combining " + frothedMilk.get() + " " 
        + brewedCoffee.get()); 
      return "Final Coffee"; 
     } 

    } 

    public static void main(String[] args) { 

     ExecutorService executor = Executors.newFixedThreadPool(2); 

     FutureTask<String> heatWaterFuture = new FutureTask<String>(new HeatWater()); 
     FutureTask<String> grindBeans = new FutureTask<String>(new GrindBeans()); 
     FutureTask<String> brewCoffee = new FutureTask<String>(new Brew(grindBeans, heatWaterFuture)); 
     FutureTask<String> frothMilk = new FutureTask<String>(new FrothMilk()); 
     FutureTask<String> combineCoffee = new FutureTask<String>(new Combine(frothMilk, brewCoffee)); 

     executor.execute(heatWaterFuture); 
     executor.execute(grindBeans); 
     executor.execute(brewCoffee); 
     executor.execute(frothMilk); 
     executor.execute(combineCoffee); 


     try { 

      /** 
      * Warning this code is blocking !!!!!!! 
      */   
      System.out.println(combineCoffee.get(20, TimeUnit.SECONDS)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } catch (ExecutionException e) { 
      e.printStackTrace(); 
     } catch (TimeoutException e) { 
      System.out.println("20 SECONDS FOR A COFFEE !!!! I am [email protected]#! leaving!!"); 
      e.printStackTrace(); 
     } finally{ 
       executor.shutdown(); 
      } 
     } 
    } 

Asegúrese de que agrega los tiempos de espera, aunque para asegurarse de que su código no esperará para siempre en algo complete, eso se hace usando Future.get (long, TimeUnit) y luego maneja la falla en consecuencia.

Es mucho mejor en Scala Sin embargo, aquí es como si fuera en el blog: El código para preparar un café sería algo como esto:

def prepareCappuccino(): Try[Cappuccino] = for { 
    ground <- Try(grind("arabica beans")) 
    water <- Try(heatWater(Water(25))) 
    espresso <- Try(brew(ground, water)) 
    foam <- Try(frothMilk("milk")) 
} yield combine(espresso, foam) 

donde todos los métodos devuelven un futuro (mecanografiado futuro), por ejemplo molienda sería algo como esto:

def grind(beans: CoffeeBeans): Future[GroundCoffee] = Future { 
    // grinding function contents 
} 

para todas las implementaciones echa un vistazo al blog, pero eso es todo lo que hay que hacer. También puede integrar Scala y Java fácilmente. Realmente recomiendo hacer este tipo de cosas en Scala en lugar de Java. Scala requiere mucho menos código, mucho más limpio y controlado por eventos.

0

Hay una biblioteca de Java específicamente para este propósito (Negación: Soy el dueño de esta biblioteca) llamada Dexecutor

Aquí es cómo se puede lograr el resultado deseado, se puede leer más sobre él here

@Test 
public void testDependentTaskExecution() { 

    DefaultDependentTasksExecutor<String, String> executor = newTaskExecutor(); 

    executor.addDependency("A", "B"); 
    executor.addIndependent("C"); 

    executor.execute(ExecutionBehavior.RETRY_ONCE_TERMINATING); 

} 

private DefaultDependentTasksExecutor<String, String> newTaskExecutor() { 
    return new DefaultDependentTasksExecutor<String, String>(newExecutor(), new SleepyTaskProvider()); 
} 

private ExecutorService newExecutor() { 
    return Executors.newFixedThreadPool(ThreadPoolUtil.ioIntesivePoolSize()); 
} 

private static class SleepyTaskProvider implements TaskProvider<String, String> { 

    public Task<String, String> provid(final String id) { 

     return new Task<String, String>() { 

      @Override 
      public String execute() { 
       try { 
        //Perform some task 
        Thread.sleep(500); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       String result = id + "processed"; 
       return result; 
      } 

      @Override 
      public boolean shouldExecute(ExecutionResults<String, String> parentResults) { 
       ExecutionResult<String, String> firstParentResult = parentResults.getFirst(); 
       //Do some logic with parent result 
       if ("B".equals(id) && firstParentResult.isSkipped()) { 
        return false; 
       } 
       return true; 
      } 
     };   
    } 

} 
Cuestiones relacionadas