2009-04-30 10 views
43

Estoy implementando un mecanismo de agrupamiento de subprocesos en el que me gustaría ejecutar tareas de diferentes prioridades. Me gustaría tener un buen mecanismo mediante el cual pueda enviar una tarea de alta prioridad al servicio y programarla antes de otras tareas. La prioridad de la tarea es una propiedad intrínseca de la tarea en sí misma (si expreso esa tarea como Callable o Runnable no es importante para mí).¿Cómo implemento la priorización de tareas usando un ExecutorService en Java 5?

Ahora, superficialmente parece que podría utilizar un PriorityBlockingQueue como la cola de tareas en mi ThreadPoolExecutor, pero esa cola contiene Runnable objetos, que pueden o no ser las tareas Runnable He presentado a la misma. Además, si he enviado tareas de Callable, no está claro cómo se mapearía.

¿Hay alguna manera de hacerlo? Realmente preferiría no hacer lo mío por esto, ya que es mucho más probable que me equivoque de esa manera.

(Un aparte; sí, soy consciente de la posibilidad de morir de hambre para trabajos de menor prioridad en algo como esto puntos extra() para soluciones que tienen una garantía razonable de equidad.?!)

+3

Interesante pregunta. Esto parece un poco un descuido en la API, en mi opinión. –

+0

Si tuviera que adivinar por qué no forma parte de la API, diría que probablemente sea porque la cuestión del hambre es complicada. Tendrían que proporcionar un nuevo conjunto de primitivas para la equidad y la escalada; cosas como must-execute-by y may-be-indefinitely-deferred (tenga en cuenta que estoy sacando estos nombres de mi trasero). Me gustaría que lo hubieran hecho, pero no los culpo :) –

+0

Sí, eso tiene sentido. Parece que sería algo bueno tener, sin embargo, pero cuando crees que necesitas esencialmente escribir un algoritmo de programación de CPU en Java, probablemente estés haciendo algo mal. –

Respuesta

8

A primera vista parecería que podría definir una interfaz para las tareas que se extiende Runnable o Callable<T> y Comparable. Luego ajuste un ThreadPoolExecutor con un PriorityBlockingQueue como la cola, y solo acepte tareas que implementen su interfaz.

Teniendo en cuenta su comentario, parece que una opción es ampliar ThreadPoolExecutor y anular los métodos submit(). Consulte AbstractExecutorService para ver cuáles son los predeterminados; todo lo que hacen es envolver el Runnable o Callable en un FutureTask y execute(). Probablemente haga esto escribiendo una clase contenedora que implemente ExecutorService y delegue en un ThreadPoolExecutor interno anónimo. Envuélvalos en algo que tenga su prioridad, para que su Comparator pueda obtenerlo.

+2

Esa fue mi opinión, también, pero aquí está el problema; las instancias 'Runnable' que se pasan a la cola de prioridad son _no_ las tareas que' presento' directamente, están envueltas en una 'java.util.concurrent.FutureTask ' que, por supuesto, no está ordenada de la misma manera. Si uso 'execute' - que no, por ejemplo, acepta' Callable' - entonces arroja mis propios objetos. –

+0

Hmm, eso complica las cosas. Pensé que había algo que me estaba perdiendo. –

+0

Lo diré. Todavía lo estoy superando, pero ... Bueno, basta decir que es un dolor :) –

0

Would ¿Es posible tener un ThreadPoolExecutor para cada nivel de prioridad? Se puede instanciar un ThreadPoolExecutor con un ThreadFactory y puede tener su propia implementación de un ThreadFactory para establecer los diferentes niveles de prioridad.

class MaxPriorityThreadFactory implements ThreadFactory { 
    public Thread newThread(Runnable r) { 
     Thread thread = new Thread(r); 
     thread.setPriority(Thread.MAX_PRIORITY); 
    } 
} 
+1

La prioridad del subproceso no es realmente importante para mí aquí; las tareas en sí mismas tenderán a ejecutarse razonablemente rápidamente (el objetivo es conseguirlas a ~ 50ms cada una), por lo que la programación de hilos es un problema menor. Es la prioridad de las tareas relativas entre sí lo que está en juego aquí. –

+0

¿Deben ejecutarse en un orden determinado? – willcodejavaforfood

+0

No hay nadie, el verdadero orden, no, pero las tareas que llegan más tarde pero tienen una prioridad más alta _debe_ ejecutarse antes que las tareas que llegan más tarde, pero son de menor prioridad. Nuevamente, con alguna garantía de imparcialidad para evitar la inanición. –

16

He resuelto este problema de manera razonable, y lo describiré a continuación para futuras referencias a mí mismo y a cualquier otra persona que tenga este problema con las bibliotecas de Java Concurrent.

El uso de PriorityBlockingQueue como medio para realizar tareas para su posterior ejecución es, de hecho, un movimiento en la dirección correcta. El problema es que el PriorityBlockingQueue debe crearse genéricamente para contener Runnable instancias, y es imposible llamar al compareTo (o similar) en una interfaz Runnable.

Para resolver el problema. Al crear el Ejecutor, se debe dar un PriorityBlockingQueue.La cola más se debe dar un comparador personalizado para hacer adecuado en el lugar de clasificación:

new PriorityBlockingQueue<Runnable>(size, new CustomTaskComparator()); 

Ahora, un vistazo a CustomTaskComparator:

public class CustomTaskComparator implements Comparator<MyType> { 

    @Override 
    public int compare(MyType first, MyType second) { 
     return comparison; 
    } 

} 

Todo parece bastante sencillo hasta este punto. Se pone un poco pegajoso aquí. Nuestro próximo problema es tratar con la creación de FutureTasks del Ejecutor. En el Ejecutor, debemos anular newTaskFor como tan:

@Override 
protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) { 
    //Override the default FutureTask creation and retrofit it with 
    //a custom task. This is done so that prioritization can be accomplished. 
    return new CustomFutureTask(c); 
} 

Dónde c es el Callable tarea que estamos tratando de ejecutar. Ahora, vamos a echar un vistazo a CustomFutureTask:

public class CustomFutureTask extends FutureTask { 

    private CustomTask task; 

    public CustomFutureTask(Callable callable) { 
     super(callable); 
     this.task = (CustomTask) callable; 
    } 

    public CustomTask getTask() { 
     return task; 
    } 

} 

Aviso el método getTask. Lo usaremos más tarde para tomar la tarea original de este CustomFutureTask que hemos creado.

Y, por último, vamos a modificar la tarea original que estábamos tratando de ejecutar:

public class CustomTask implements Callable<MyType>, Comparable<CustomTask> { 

    private final MyType myType; 

    public CustomTask(MyType myType) { 
     this.myType = myType; 
    } 

    @Override 
    public MyType call() { 
     //Do some things, return something for FutureTask implementation of `call`. 
     return myType; 
    } 

    @Override 
    public int compareTo(MyType task2) { 
     return new CustomTaskComparator().compare(this.myType, task2.myType); 
    } 

} 

Se puede ver que ponemos en práctica Comparable en la tarea de delegar a la actual Comparator para MyType.

Y ahí lo tienes, priorización personalizada para un ejecutor utilizando las bibliotecas de Java. Se necesita un poco de flexión, pero es el más limpio que he podido inventar. ¡Espero que esto sea útil para alguien!

+1

Existen algunas limitaciones inherentes a este mecanismo. Por ejemplo, el primer ejecutable/invocable pasado al ejecutor no entra en la cola. Por lo tanto, el mecanismo de prioridad solo se aplicará cuando las tareas estén en cola, y esto sucede cuando el número de corredores actuales excede el número máximo de subprocesos en el tamaño del grupo (aquí 1). – Snicolas

+0

En CustomTask, no debe instanciar un objeto para cada comparación, esto va a ralentizar mucho las cosas. – Snicolas

+0

¿dónde se usa getTask? –

4

Puede utilizar estas clases de ayuda:

public class PriorityFuture<T> implements RunnableFuture<T> { 

    private RunnableFuture<T> src; 
    private int priority; 

    public PriorityFuture(RunnableFuture<T> other, int priority) { 
     this.src = other; 
     this.priority = priority; 
    } 

    public int getPriority() { 
     return priority; 
    } 

    public boolean cancel(boolean mayInterruptIfRunning) { 
     return src.cancel(mayInterruptIfRunning); 
    } 

    public boolean isCancelled() { 
     return src.isCancelled(); 
    } 

    public boolean isDone() { 
     return src.isDone(); 
    } 

    public T get() throws InterruptedException, ExecutionException { 
     return src.get(); 
    } 

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     return src.get(timeout, unit); 
    } 

    public void run() { 
     src.run(); 
    } 

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() { 
     public int compare(Runnable o1, Runnable o2) { 
      if (o1 == null && o2 == null) 
       return 0; 
      else if (o1 == null) 
       return -1; 
      else if (o2 == null) 
       return 1; 
      else { 
       int p1 = ((PriorityFuture<?>) o1).getPriority(); 
       int p2 = ((PriorityFuture<?>) o2).getPriority(); 

       return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); 
      } 
     } 
    }; 
} 

Y

public interface PriorityCallable<T> extends Callable<T> { 

    int getPriority(); 

} 

Y este método de ayuda:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
      new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { 

     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
      RunnableFuture<T> newTaskFor = super.newTaskFor(callable); 
      return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); 
     } 
    }; 
} 

Y entonces úsala así:

class LenthyJob implements PriorityCallable<Long> { 
    private int priority; 

    public LenthyJob(int priority) { 
     this.priority = priority; 
    } 

    public Long call() throws Exception { 
     System.out.println("Executing: " + priority); 
     long num = 1000000; 
     for (int i = 0; i < 1000000; i++) { 
      num *= Math.random() * 1000; 
      num /= Math.random() * 1000; 
      if (num == 0) 
       num = 1000000; 
     } 
     return num; 
    } 

    public int getPriority() { 
     return priority; 
    } 
} 

public class TestPQ { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     ThreadPoolExecutor exec = getPriorityExecutor(2); 

     for (int i = 0; i < 20; i++) { 
      int priority = (int) (Math.random() * 100); 
      System.out.println("Scheduling: " + priority); 
      LenthyJob job = new LenthyJob(priority); 
      exec.submit(job); 
     } 
    } 
} 
+0

@Snicolas el cambio que ha realizado no se compila. – assylias

+0

@assyslias, ¿qué versión de JDK usas? – Snicolas

+0

Solo una palabra para decir que esta respuesta también es buena. – Snicolas

3

Intentaré explicar este problema con un código completamente funcional. Pero antes de sumergirme en el código, me gustaría explicar sobre PriorityBlockingQueue

PriorityBlockingQueue: PriorityBlockingQueue es una implementación de BlockingQueue. Acepta las tareas junto con su prioridad y envía la tarea con la más alta prioridad para su ejecución primero. Si dos tareas tienen la misma prioridad, debemos proporcionar alguna lógica personalizada para decidir qué tarea va primero.

Ahora entremos en el código inmediatamente.

Clase de controlador: Esta clase crea un ejecutor que acepta tareas y luego las envía para su ejecución. Aquí creamos dos tareas, una con prioridad baja y otra con prioridad ALTA. Aquí le decimos al ejecutor que ejecute un MAX de 1 subprocesos y use PriorityBlockingQueue.

 public static void main(String[] args) { 

     /* 
     Minimum number of threads that must be running : 0 
     Maximium number of threads that can be created : 1 
     If a thread is idle, then the minimum time to keep it alive : 1000 
     Which queue to use : PriorityBlockingQueue 
     */ 
    PriorityBlockingQueue queue = new PriorityBlockingQueue(); 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 
     1000, TimeUnit.MILLISECONDS,queue); 


    MyTask task = new MyTask(Priority.LOW,"Low"); 
    executor.execute(new MyFutureTask(task)); 
    task = new MyTask(Priority.HIGH,"High"); 
    executor.execute(new MyFutureTask(task)); 
    task = new MyTask(Priority.MEDIUM,"Medium"); 
    executor.execute(new MyFutureTask(task)); 

} 

clase MyTask: MyTask implementa Ejecutable y acepta prioridad como un argumento en el constructor. Cuando se ejecuta esta tarea, imprime un mensaje y luego pone el hilo en modo de suspensión durante 1 segundo.

public class MyTask implements Runnable { 

    public int getPriority() { 
    return priority.getValue(); 
    } 

    private Priority priority; 

    public String getName() { 
    return name; 
    } 

    private String name; 

    public MyTask(Priority priority,String name){ 
    this.priority = priority; 
    this.name = name; 
    } 

    @Override 
    public void run() { 
    System.out.println("The following Runnable is getting executed "+getName()); 
    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

} 

clase MyFutureTask: Ya que estamos usando PriorityBlocingQueue para la celebración de nuestras tareas, las tareas debe ser envuelto en el interior FutureTask y nuestra aplicación de FutureTask debe implementar la interfaz Comparable. La interfaz Comparable compara la prioridad de 2 tareas diferentes y envía la tarea con la mayor prioridad para la ejecución.

public class MyFutureTask extends FutureTask<MyFutureTask> 
     implements Comparable<MyFutureTask> { 

    private MyTask task = null; 

    public MyFutureTask(MyTask task){ 
     super(task,null); 
     this.task = task; 
    } 

    @Override 
    public int compareTo(MyFutureTask another) { 
     return task.getPriority() - another.task.getPriority(); 
    } 
    } 

Clase de prioridad: explica por sí misma clase de prioridad.

public enum Priority { 

    HIGHEST(0), 
    HIGH(1), 
    MEDIUM(2), 
    LOW(3), 
    LOWEST(4); 

    int value; 

    Priority(int val) { 
    this.value = val; 
    } 

    public int getValue(){ 
    return value; 
    } 


} 

Ahora cuando nos encontramos este ejemplo, obtenemos el siguiente resultado

The following Runnable is getting executed High 
The following Runnable is getting executed Medium 
The following Runnable is getting executed Low 

A pesar de que hemos presentado la baja prioridad en primer lugar, pero tarea de alta prioridad más tarde, pero ya que estamos utilizando un PriorityBlockingQueue, cualquier la tarea con una prioridad más alta se ejecutará primero.

+0

Crear tareas High1, High2, Low1, Low2, Low3, y su ejecución se baraja dentro del nivel de prioridad. Se necesita una solución que preserve el orden de envío para las mismas tareas de prioridad –

1

Mi solución preserva el orden de submisión de tareas para las mismas prioridades. Es una mejora de este answer

Tarea orden de ejecución se basa en:

  1. Prioridad
  2. Enviar pedido (dentro de la misma prioridad)
  3. clase

Tester:

public class Main { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 

     ExecutorService executorService = PriorityExecutors.newFixedThreadPool(1); 

     //Priority=0 
     executorService.submit(newCallable("A1", 200));  //Defaults to priority=0 
     executorService.execute(newRunnable("A2", 200)); //Defaults to priority=0 
     executorService.submit(PriorityCallable.of(newCallable("A3", 200), 0)); 
     executorService.submit(PriorityRunnable.of(newRunnable("A4", 200), 0)); 
     executorService.execute(PriorityRunnable.of(newRunnable("A5", 200), 0)); 
     executorService.submit(PriorityRunnable.of(newRunnable("A6", 200), 0)); 
     executorService.execute(PriorityRunnable.of(newRunnable("A7", 200), 0)); 
     executorService.execute(PriorityRunnable.of(newRunnable("A8", 200), 0)); 

     //Priority=1 
     executorService.submit(PriorityRunnable.of(newRunnable("B1", 200), 1)); 
     executorService.submit(PriorityRunnable.of(newRunnable("B2", 200), 1)); 
     executorService.submit(PriorityCallable.of(newCallable("B3", 200), 1)); 
     executorService.execute(PriorityRunnable.of(newRunnable("B4", 200), 1)); 
     executorService.submit(PriorityRunnable.of(newRunnable("B5", 200), 1)); 

     executorService.shutdown(); 

    } 

    private static Runnable newRunnable(String name, int delay) { 
     return new Runnable() { 
      @Override 
      public void run() { 
       System.out.println(name); 
       sleep(delay); 
      } 
     }; 
    } 

    private static Callable<Integer> newCallable(String name, int delay) { 
     return new Callable<Integer>() { 
      @Override 
      public Integer call() throws Exception { 
       System.out.println(name); 
       sleep(delay); 
       return 10; 
      } 
     }; 
    } 

    private static void sleep(long millis) { 
     try { 
      Thread.sleep(millis); 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
      throw new RuntimeException(e); 
     } 
    } 

} 

Resultado:

A1 B1 B2 B3 B4 B5 A2 A3 A4 A5 A6 A7 A8

primera tarea es A1 porque no había más alta prioridad en la cola cuando se insertó. tareas B son 1 prioridad por lo ejecutado anteriormente, las tareas A son 0 prioridad, de manera ejecutados más tarde, pero el orden de ejecución es la siguiente orden submition: B1, B2, B3, ... A2, A3, A4 ...

La solución:

public class PriorityExecutors { 

    public static ExecutorService newFixedThreadPool(int nThreads) { 
     return new PriorityExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS); 
    } 

    private static class PriorityExecutor extends ThreadPoolExecutor { 
     private static final int DEFAULT_PRIORITY = 0; 
     private static AtomicLong instanceCounter = new AtomicLong(); 

     @SuppressWarnings({"unchecked"}) 
     public PriorityExecutor(int corePoolSize, int maximumPoolSize, 
       long keepAliveTime, TimeUnit unit) { 
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, (BlockingQueue) new PriorityBlockingQueue<ComparableTask>(10, 
        ComparableTask.comparatorByPriorityAndSequentialOrder())); 
     } 

     @Override 
     public void execute(Runnable command) { 
      // If this is ugly then delegator pattern needed 
      if (command instanceof ComparableTask) //Already wrapped 
       super.execute(command); 
      else { 
       super.execute(newComparableRunnableFor(command)); 
      } 
     } 

     private Runnable newComparableRunnableFor(Runnable runnable) { 
      return new ComparableRunnable(ensurePriorityRunnable(runnable)); 
     } 

     @Override 
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
      return new ComparableFutureTask<>(ensurePriorityCallable(callable)); 
     } 

     @Override 
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
      return new ComparableFutureTask<>(ensurePriorityRunnable(runnable), value); 
     } 

     private <T> PriorityCallable<T> ensurePriorityCallable(Callable<T> callable) { 
      return (callable instanceof PriorityCallable) ? (PriorityCallable<T>) callable 
        : PriorityCallable.of(callable, DEFAULT_PRIORITY); 
     } 

     private PriorityRunnable ensurePriorityRunnable(Runnable runnable) { 
      return (runnable instanceof PriorityRunnable) ? (PriorityRunnable) runnable 
        : PriorityRunnable.of(runnable, DEFAULT_PRIORITY); 
     } 

     private class ComparableFutureTask<T> extends FutureTask<T> implements ComparableTask { 
      private Long sequentialOrder = instanceCounter.getAndIncrement(); 
      private HasPriority hasPriority; 

      public ComparableFutureTask(PriorityCallable<T> priorityCallable) { 
       super(priorityCallable); 
       this.hasPriority = priorityCallable; 
      } 

      public ComparableFutureTask(PriorityRunnable priorityRunnable, T result) { 
       super(priorityRunnable, result); 
       this.hasPriority = priorityRunnable; 
      } 

      @Override 
      public long getInstanceCount() { 
       return sequentialOrder; 
      } 

      @Override 
      public int getPriority() { 
       return hasPriority.getPriority(); 
      } 
     } 

     private static class ComparableRunnable implements Runnable, ComparableTask { 
      private Long instanceCount = instanceCounter.getAndIncrement(); 
      private HasPriority hasPriority; 
      private Runnable runnable; 

      public ComparableRunnable(PriorityRunnable priorityRunnable) { 
       this.runnable = priorityRunnable; 
       this.hasPriority = priorityRunnable; 
      } 

      @Override 
      public void run() { 
       runnable.run(); 
      } 

      @Override 
      public int getPriority() { 
       return hasPriority.getPriority(); 
      } 

      @Override 
      public long getInstanceCount() { 
       return instanceCount; 
      } 
     } 

     private interface ComparableTask extends Runnable { 
      int getPriority(); 

      long getInstanceCount(); 

      public static Comparator<ComparableTask> comparatorByPriorityAndSequentialOrder() { 
       return (o1, o2) -> { 
        int priorityResult = o2.getPriority() - o1.getPriority(); 
        return priorityResult != 0 ? priorityResult 
          : (int) (o1.getInstanceCount() - o2.getInstanceCount()); 
       }; 
      } 

     } 

    } 

    private static interface HasPriority { 
     int getPriority(); 
    } 

    public interface PriorityCallable<V> extends Callable<V>, HasPriority { 

     public static <V> PriorityCallable<V> of(Callable<V> callable, int priority) { 
      return new PriorityCallable<V>() { 
       @Override 
       public V call() throws Exception { 
        return callable.call(); 
       } 

       @Override 
       public int getPriority() { 
        return priority; 
       } 
      }; 
     } 
    } 

    public interface PriorityRunnable extends Runnable, HasPriority { 

     public static PriorityRunnable of(Runnable runnable, int priority) { 
      return new PriorityRunnable() { 
       @Override 
       public void run() { 
        runnable.run(); 
       } 

       @Override 
       public int getPriority() { 
        return priority; 
       } 
      }; 
     } 
    } 

} 
+0

Actualización: no está funcionando junto con el detector de audio de Guava .... :( –

Cuestiones relacionadas