2012-03-08 14 views
6

Estoy buscando tener un ThreadPoolExecutor donde puedo configurar corePoolSize y maximumPoolSize y lo que sucede es que la cola transferiría la tarea inmediatamente al grupo de subprocesos y así crear nuevos subprocesos hasta que llegue al maximumPoolSize y luego comenzar a agregar a una cola .Estrategia de Java ThreadPoolExecutor, 'Transferencia directa' con cola?

¿Existe tal cosa? Si no, ¿hay alguna buena razón por la que no tiene una estrategia de este tipo?

Lo que quiero en esencia es que las tareas se envíen para su ejecución y cuando llegue a un punto en el que esencialmente obtendrá el peor rendimiento al tener demasiados hilos (al establecer maximumPoolSize), dejará de agregar nuevos hilos y trabaje con ese grupo de subprocesos y empiece a hacer cola, luego, si la cola está llena, rechaza.

Y cuando la carga vuelve a bajar, puede comenzar el desmantelamiento de los hilos que no se utilizan en el núcleo de la base.

Esto tiene más sentido para mí en mi aplicación de los 'tres estrategias generales' que figuran en http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html

+0

Posible duplicado: http://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with- a-size-limit – mnicky

Respuesta

3

Nota: estas implementaciones son algo defectuoso y no determinista. Lea la respuesta completa y los comentarios antes de usar este código.

¿Qué le parece crear una cola de trabajos que rechaza elementos mientras el ejecutor está por debajo del tamaño máximo de la agrupación y comienza a aceptarlos una vez que se alcanza el máximo?

Esto se basa en el comportamiento documentado:

"Si la solicitud no puede ponerse en cola, se crea un nuevo hilo a menos que esta excedería maximumPoolSize, en cuyo caso, la tarea será rechazaron"

public class ExecutorTest 
{ 
    private static final int CORE_POOL_SIZE = 2; 
    private static final int MAXIMUM_POOL_SIZE = 4; 
    private static final int KEEP_ALIVE_TIME_MS = 5000; 

    public static void main(String[] args) 
    { 
     final SaturateExecutorBlockingQueue workQueue = 
      new SaturateExecutorBlockingQueue(); 

     final ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE, 
        KEEP_ALIVE_TIME_MS, 
        TimeUnit.MILLISECONDS, 
        workQueue); 

     workQueue.setExecutor(executor); 

     for (int i = 0; i < 6; i++) 
     { 
      final int index = i; 
      executor.submit(new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         Thread.sleep(1000); 
        } 
        catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 

        System.out.println("Runnable " + index 
          + " on thread: " + Thread.currentThread()); 
       } 
      }); 
     } 
    } 

    public static class SaturateExecutorBlockingQueue 
     extends LinkedBlockingQueue<Runnable> 
    { 
     private ThreadPoolExecutor executor; 

     public void setExecutor(ThreadPoolExecutor executor) 
     { 
      this.executor = executor; 
     } 

     public boolean offer(Runnable e) 
     { 
      if (executor.getPoolSize() < executor.getMaximumPoolSize()) 
      { 
       return false; 
      } 
      return super.offer(e); 
     } 
    } 
} 

Nota: Su pregunta me sorprendió porque esperaba que su comportamiento deseado a ser el comportamiento predeterminado de una ThreadPoolExecutor configurado con un corePoolSize < maximumPoolSize. Pero como usted señala, el JavaDoc para ThreadPoolExecutor establece claramente lo contrario.


Idea # 2

creo que tengo lo que es probablemente un enfoque ligeramente mejor. Se basa en el comportamiento de efectos secundarios codificado en el método setCorePoolSize en ThreadPoolExecutor. La idea es aumentar de forma temporal y condicional el tamaño de la agrupación de núcleos cuando se pone en cola un elemento de trabajo. Al aumentar el tamaño del grupo de núcleos, el ThreadPoolExecutor generará de inmediato suficientes hilos nuevos para ejecutar todas las tareas en cola (queue.size()). Luego, disminuimos de inmediato el tamaño del grupo de núcleos, lo que permite que el grupo de subprocesos se reduzca de forma natural durante períodos futuros de baja actividad. Este enfoque aún no es completamente determinista (es posible que el tamaño del grupo crezca por encima del tamaño máximo de la agrupación, por ejemplo), pero creo que en casi todos los casos es mejor que la primera estrategia.

En concreto, creo que este enfoque es mejor que la primera porque:

  1. Se reutilizará las discusiones más a menudo
  2. No rechazará la ejecución como resultado de una carrera
  3. me gustaría Mencione nuevamente que el primer enfoque hace que el grupo de subprocesos crezca a su tamaño máximo incluso bajo un uso muy ligero. Este enfoque debería ser mucho más eficiente en ese sentido.

-

public class ExecutorTest2 
{ 
    private static final int KEEP_ALIVE_TIME_MS = 5000; 
    private static final int CORE_POOL_SIZE = 2; 
    private static final int MAXIMUM_POOL_SIZE = 4; 

    public static void main(String[] args) throws InterruptedException 
    { 
     final SaturateExecutorBlockingQueue workQueue = 
      new SaturateExecutorBlockingQueue(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE); 

     final ThreadPoolExecutor executor = 
      new ThreadPoolExecutor(CORE_POOL_SIZE, 
        MAXIMUM_POOL_SIZE, 
        KEEP_ALIVE_TIME_MS, 
        TimeUnit.MILLISECONDS, 
        workQueue); 

     workQueue.setExecutor(executor); 

     for (int i = 0; i < 60; i++) 
     { 
      final int index = i; 
      executor.submit(new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         Thread.sleep(1000); 
        } 
        catch (InterruptedException e) 
        { 
         e.printStackTrace(); 
        } 

        System.out.println("Runnable " + index 
          + " on thread: " + Thread.currentThread() 
          + " poolSize: " + executor.getPoolSize()); 
       } 
      }); 
     } 

     executor.shutdown(); 

     executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); 
    } 

    public static class SaturateExecutorBlockingQueue 
     extends LinkedBlockingQueue<Runnable> 
    { 
     private final int corePoolSize; 
     private final int maximumPoolSize; 
     private ThreadPoolExecutor executor; 

     public SaturateExecutorBlockingQueue(int corePoolSize, 
       int maximumPoolSize) 
     { 
      this.corePoolSize = corePoolSize; 
      this.maximumPoolSize = maximumPoolSize; 
     } 

     public void setExecutor(ThreadPoolExecutor executor) 
     { 
      this.executor = executor; 
     } 

     public boolean offer(Runnable e) 
     { 
      if (super.offer(e) == false) 
      { 
       return false; 
      } 
      // Uncomment one or both of the below lines to increase 
      // the likelyhood of the threadpool reusing an existing thread 
      // vs. spawning a new one. 
      //Thread.yield(); 
      //Thread.sleep(0); 
      int currentPoolSize = executor.getPoolSize(); 
      if (currentPoolSize < maximumPoolSize 
        && currentPoolSize >= corePoolSize) 
      { 
       executor.setCorePoolSize(currentPoolSize + 1); 
       executor.setCorePoolSize(corePoolSize); 
      } 
      return true; 
     } 
    } 
} 
+0

Bueno, lo que sugirió es lo que dije en el comentario en la respuesta ahora eliminada. De todas formas. Creo que encontrará que la oferta principal puede causar problemas ya que no tiene un bloqueo. ¿Incluso si solo son 3 instrucciones? o tan largo –

+0

@RonaldChan Me preocupaba lo mismo, y he estado tratando de razonar a través de los posibles problemas, pero todavía tengo que atacar a ninguno. Continuaré tratando de pensarlo. –

+0

Acepte su respuesta de todos modos. Esto es lo que implementé al final, en lugar de intentar reescribir el método de ejecución de ThreadPoolExectuor, que es mucho más complicado. FYI También agregué un buffer en maximumPoolSize para que sea un 5% más bajo que el real para permitir carreras. No es el método ideal, pero el comportamiento predeterminado de ThreadPoolExecutor es impar y no puedo encontrar ninguna implementación externa de ThreadPools que tenga el comportamiento que describí. Si hay, consideraré cambiar la respuesta aceptada. –

2

Hemos encontrado una solución a ese problema con el siguiente código:

Esta cola es un híbrido SynchronousQueue/LinkedBlockingQueue.

public class OverflowingSynchronousQueue<E> extends LinkedBlockingQueue<E> { 
    private static final long serialVersionUID = 1L; 

    private SynchronousQueue<E> synchronousQueue = new SynchronousQueue<E>(); 

    public OverflowingSynchronousQueue() { 
    super(); 
    } 

    public OverflowingSynchronousQueue(int capacity) { 
    super(capacity); 
    } 

    @Override 
    public boolean offer(E e) { 
    // Create a new thread or wake an idled thread 
    return synchronousQueue.offer(e); 
    } 

    public boolean offerToOverflowingQueue(E e) { 
    // Add to queue 
    return super.offer(e); 
    } 

    @Override 
    public E take() throws InterruptedException { 
    // Return tasks from queue, if any, without blocking 
    E task = super.poll(); 
    if (task != null) { 
     return task; 
    } else { 
     // Block on the SynchronousQueue take 
     return synchronousQueue.take(); 
    } 
    } 

    @Override 
    public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
    // Return tasks from queue, if any, without blocking 
    E task = super.poll(); 
    if (task != null) { 
     return task; 
    } else { 
     // Block on the SynchronousQueue poll 
     return synchronousQueue.poll(timeout, unit); 
    } 
    } 

}

Para que funcione, tenemos que envolver el RejectedExecutionHandler llamar "offerToOverflowingQueue" cuando se rechaza una tarea.

public class OverflowingRejectionPolicyAdapter implements RejectedExecutionHandler { 

    private OverflowingSynchronousQueue<Runnable> queue; 
    private RejectedExecutionHandler adaptedRejectedExecutionHandler; 

    public OverflowingRejectionPolicyAdapter(OverflowingSynchronousQueue<Runnable> queue, 
              RejectedExecutionHandler adaptedRejectedExecutionHandler) 
    { 
    super(); 
    this.queue = queue; 
    this.adaptedRejectedExecutionHandler = adaptedRejectedExecutionHandler; 
    } 

    @Override 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    if (!queue.offerToOverflowingQueue(r)) { 
     adaptedRejectedExecutionHandler.rejectedExecution(r, executor); 
    } 
    } 
} 

Así es como creamos el ThreadPoolExecutor

public static ExecutorService newSaturatingThreadPool(int corePoolSize, 
                 int maxPoolSize, 
                 int maxQueueSize, 
                 long keepAliveTime, 
                 TimeUnit timeUnit, 
                 String threadNamePrefix, 
                 RejectedExecutionHandler rejectedExecutionHandler) 
    { 
    OverflowingSynchronousQueue<Runnable> queue = new OverflowingSynchronousQueue<Runnable>(maxQueueSize); 
    OverflowingRejectionPolicyAdapter rejectionPolicyAdapter = new OverflowingRejectionPolicyAdapter(queue, 
                            rejectedExecutionHandler); 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, 
                 maxPoolSize, 
                 keepAliveTime, 
                 timeUnit, 
                 queue, 
                 new NamedThreadFactory(threadNamePrefix), 
                 rejectionPolicyAdapter); 
    return executor; 
} 
Cuestiones relacionadas