2010-08-05 26 views
13

Estoy tratando de utilizar un ThreadPoolExecutor para programar tareas, pero se encuentra con algunos problemas con sus políticas. Este es el comportamiento indicado:política ThreadPoolExecutor

  1. Si se están ejecutando menos hilos corePoolSize, el Ejecutor siempre prefiere agregar un nuevo hilo en lugar de hacer cola.
  2. Si corePoolSize o más subprocesos se están ejecutando, el Ejecutor siempre prefiere poner en cola una solicitud en lugar de agregar un nuevo subproceso.
  3. Si no se puede poner en cola una solicitud, se crea un nuevo subproceso a menos que exceda el máximo dePoolsize, en cuyo caso, la tarea será rechazada.

El comportamiento que quiero es esto:

  1. mismo que el anterior
  2. Si se están ejecutando más de corePoolSize pero menos de hilos maximumPoolSize, prefiere la adición de un nuevo hilo sobre la puesta en cola, y el uso de un subproceso inactivo sobre agregar un nuevo hilo.
  3. mismo que el anterior

Básicamente no quiero ninguna tarea a ser rechazados; Quiero que estén en cola en una fila ilimitada. Pero sí quiero tener hasta el máximo de subprocesos de Poolsize. Si uso una cola ilimitada, nunca genera subprocesos después de que llegue a coreSize. Si uso una cola acotada, rechaza las tareas. ¿Hay alguna forma de evitar esto?

Lo que estoy pensando ahora es ejecutar el ThreadPoolExecutor en una Cola Sincrónica, pero sin cargar las tareas directamente en él, en lugar de alimentarlas a una LinkedBlockingQueue independiente no acotada. Luego, otro hilo se carga desde LinkedBlockingQueue en el Ejecutor, y si uno es rechazado, simplemente lo intenta de nuevo hasta que no sea rechazado. Sin embargo, esto parece una molestia y un poco complicado, ¿hay alguna manera más clara de hacerlo?

Respuesta

1

Su caso de uso es común, completamente de fiar y, por desgracia, más difícil de lo que cabría esperar. Para información de fondo puede leer this discussion y encontrar un puntero a una solución (también mencionado en el hilo) here. La solución de Shay funciona bien.

En general, sería un poco cauteloso con las colas ilimitadas; Por lo general, es mejor contar con un control de flujo de entrada explícito que se degrade graciosamente y regule la proporción de trabajo actual/restante para no abrumar ni al productor ni al consumidor.

1

¿Acaba de establecer corePoolsize = maximumPoolSize y utiliza una cola ilimitada?

En su lista de puntos, 1 excluye 2, ya que corePoolSize siempre será menor o igual que maximumPoolSize.

Editar

Todavía hay algo incompatibles entre lo que quiere y lo que va a ofrecer TPE.

Si tiene una cola sin delimitar, maximumPoolSize se ignora, por lo que, como observó, nunca se crearán y utilizarán más de corePoolSize hilos.

Así que, de nuevo, si toma corePoolsize = maximumPoolSize con una cola ilimitada, tiene lo que desea, ¿no?

+0

Vaya, lo que escribí no era exactamente lo que quería. Edité el original. –

+0

Configuración de corePoolsize = maximumPoolSize de hecho está cerca, pero también estoy usando allowCoreThreadTimeOut (false) y prestartAllCoreThreads(). –

4

probablemente no es necesario micro-gestión de la agrupación de hebras que se solicita.

Un grupo de subprocesos en caché reutilizará subprocesos inactivos al tiempo que permite subprocesos simultáneos potencialmente ilimitados.Esto, por supuesto, podría conducir a un rendimiento fuera de control que se degrada desde la sobrecarga de cambio de contexto durante los períodos de ráfagas.

Executors.newCachedThreadPool(); 

Una mejor opción es colocar un límite en el número total de hilos, pero liberada por la idea de asegurar hebras en espera se utilizó por primera vez. Los cambios de configuración serían:

corePoolSize = maximumPoolSize = N; 
allowCoreThreadTimeOut(true); 
setKeepAliveTime(aReasonableTimeDuration, TimeUnit.SECONDS); 

Razonamiento sobre este escenario, si el ejecutor tiene menos de corePoolSize hilos, de lo que no debe estar muy ocupado. Si el sistema no está muy ocupado, entonces hay poco daño al girar un nuevo hilo. Hacer esto hará que su ThreadPoolExecutor siempre cree un nuevo trabajador si está por debajo de la cantidad máxima de trabajadores permitidos. Solo cuando el número máximo de trabajadores está "en funcionamiento", los trabajadores que esperan ociosamente las tareas recibirán tareas. Si un trabajador espera aReasonableTimeDuration sin una tarea, entonces puede terminar. Usando límites razonables para el tamaño de la agrupación (después de todo, hay solo muchas CPU) y un tiempo de espera razonablemente grande (para evitar que los hilos terminen innecesariamente), es probable que se vean los beneficios deseados.

La última opción es hackish. Básicamente, el ThreadPoolExecutor internamente usa BlockingQueue.offer para determinar si la cola tiene capacidad. Una implementación personalizada de BlockingQueue siempre podría rechazar el intento offer. Cuando el ThreadPoolExecutor falla al offer una tarea en la cola, intentará crear un nuevo trabajador. Si no se puede crear un nuevo trabajador, se llamaría a RejectedExecutionHandler. En ese punto, un RejectedExecutionHandler personalizado podría forzar un put en el BlockingQueue personalizado.

/** Hackish BlockingQueue Implementation tightly coupled to ThreadPoolexecutor implementation details. */ 
class ThreadPoolHackyBlockingQueue<T> implements BlockingQueue<T>, RejectedExecutionHandler { 
    BlockingQueue<T> delegate; 

    public boolean offer(T item) { 
     return false; 
    } 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     delegate.put(r); 
    } 

    //.... delegate methods 
} 
Cuestiones relacionadas