Nota: estas implementaciones son algo defectuoso y no determinista. Lea la respuesta completa y los comentarios antes de usar este código.
¿Qué le parece crear una cola de trabajos que rechaza elementos mientras el ejecutor está por debajo del tamaño máximo de la agrupación y comienza a aceptarlos una vez que se alcanza el máximo?
Esto se basa en el comportamiento documentado:
"Si la solicitud no puede ponerse en cola, se crea un nuevo hilo a menos que esta excedería maximumPoolSize, en cuyo caso, la tarea será rechazaron"
public class ExecutorTest
{
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
private static final int KEEP_ALIVE_TIME_MS = 5000;
public static void main(String[] args)
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue();
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 6; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread());
}
});
}
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private ThreadPoolExecutor executor;
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (executor.getPoolSize() < executor.getMaximumPoolSize())
{
return false;
}
return super.offer(e);
}
}
}
Nota: Su pregunta me sorprendió porque esperaba que su comportamiento deseado a ser el comportamiento predeterminado de una ThreadPoolExecutor configurado con un corePoolSize < maximumPoolSize. Pero como usted señala, el JavaDoc para ThreadPoolExecutor establece claramente lo contrario.
Idea # 2
creo que tengo lo que es probablemente un enfoque ligeramente mejor. Se basa en el comportamiento de efectos secundarios codificado en el método setCorePoolSize
en ThreadPoolExecutor
. La idea es aumentar de forma temporal y condicional el tamaño de la agrupación de núcleos cuando se pone en cola un elemento de trabajo. Al aumentar el tamaño del grupo de núcleos, el ThreadPoolExecutor
generará de inmediato suficientes hilos nuevos para ejecutar todas las tareas en cola (queue.size()). Luego, disminuimos de inmediato el tamaño del grupo de núcleos, lo que permite que el grupo de subprocesos se reduzca de forma natural durante períodos futuros de baja actividad. Este enfoque aún no es completamente determinista (es posible que el tamaño del grupo crezca por encima del tamaño máximo de la agrupación, por ejemplo), pero creo que en casi todos los casos es mejor que la primera estrategia.
En concreto, creo que este enfoque es mejor que la primera porque:
- Se reutilizará las discusiones más a menudo
- No rechazará la ejecución como resultado de una carrera
- me gustaría Mencione nuevamente que el primer enfoque hace que el grupo de subprocesos crezca a su tamaño máximo incluso bajo un uso muy ligero. Este enfoque debería ser mucho más eficiente en ese sentido.
-
public class ExecutorTest2
{
private static final int KEEP_ALIVE_TIME_MS = 5000;
private static final int CORE_POOL_SIZE = 2;
private static final int MAXIMUM_POOL_SIZE = 4;
public static void main(String[] args) throws InterruptedException
{
final SaturateExecutorBlockingQueue workQueue =
new SaturateExecutorBlockingQueue(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE);
final ThreadPoolExecutor executor =
new ThreadPoolExecutor(CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME_MS,
TimeUnit.MILLISECONDS,
workQueue);
workQueue.setExecutor(executor);
for (int i = 0; i < 60; i++)
{
final int index = i;
executor.submit(new Runnable()
{
public void run()
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Runnable " + index
+ " on thread: " + Thread.currentThread()
+ " poolSize: " + executor.getPoolSize());
}
});
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
public static class SaturateExecutorBlockingQueue
extends LinkedBlockingQueue<Runnable>
{
private final int corePoolSize;
private final int maximumPoolSize;
private ThreadPoolExecutor executor;
public SaturateExecutorBlockingQueue(int corePoolSize,
int maximumPoolSize)
{
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
}
public void setExecutor(ThreadPoolExecutor executor)
{
this.executor = executor;
}
public boolean offer(Runnable e)
{
if (super.offer(e) == false)
{
return false;
}
// Uncomment one or both of the below lines to increase
// the likelyhood of the threadpool reusing an existing thread
// vs. spawning a new one.
//Thread.yield();
//Thread.sleep(0);
int currentPoolSize = executor.getPoolSize();
if (currentPoolSize < maximumPoolSize
&& currentPoolSize >= corePoolSize)
{
executor.setCorePoolSize(currentPoolSize + 1);
executor.setCorePoolSize(corePoolSize);
}
return true;
}
}
}
Posible duplicado: http://stackoverflow.com/questions/1800317/impossible-to-make-a-cached-thread-pool-with- a-size-limit – mnicky