2009-09-16 16 views
5

¿Hay alguna manera de crear Ejecutor que tendrá siempre al menos 5 hilos, y un máximo de 20 hilos, y la cola sin límites para las tareas (es decir, ninguna tarea es rechazada)especificando problema ThreadPoolExecutor

Probé nueva ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) con todas las posibilidades que pensé por cola:

new LinkedBlockingQueue() // never runs more than 5 threads 
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new SynchronousQueue() // no tasks can wait, after 20, they are rejected 

y ninguno trabajaron como quería.

+0

¿Qué quieres decir con que no funcionó? ¿Una excepción? – Gandalf

+4

que está demasiado cerca: ¡Gandalf y Sarmun! – akf

+0

Los Ents deberían estar aquí en cualquier momento. . . y dónde está ese molesto Eagle ... – Gandalf

Respuesta

5

Tal vez algo así podría funcionar para usted? Simplemente lo preparé, así que por favor empuja. Básicamente, se implementa un grupo de subprocesos de desbordamiento que se utiliza para alimentar el subyacente ThreadPoolExecutor

Hay dos grandes pegas que veo con ella:

  • La falta de un objeto futuro devuelta a la submit(). Pero tal vez eso no sea un problema para ti.
  • La cola secundaria solo se vaciará en el ThreadPoolExecutor cuando se envíen trabajos. Tiene que haber una solución elegante, pero aún no lo veo. Si sabe que habrá un flujo constante de tareas en el StusMagicExecutor, puede que esto no sea un problema. ("Mayo" es la palabra clave). ¿Una opción podría ser hacer que las tareas enviadas ingresen al StusMagicExecutor después de que se completen?

magia Ejecutor de Stu:

public class StusMagicExecutor extends ThreadPoolExecutor { 
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE. 

    public StusMagicExecutor() { 
     super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler()); 
    } 
    public void queueRejectedTask(Runnable task) { 
     try { 
      secondaryQueue.put(task); 
     } catch (InterruptedException e) { 
      // do something 
     } 
    } 
    public Future submit(Runnable newTask) { 
     //drain secondary queue as rejection handler populates it 
     Collection<Runnable> tasks = new ArrayList<Runnable>(); 
     secondaryQueue.drainTo(tasks); 

     tasks.add(newTask); 

     for (Runnable task : tasks) 
      super.submit(task); 

     return null; //does not return a future! 
    } 
} 

class RejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 
     ((StusMagicExecutor)executor).queueRejectedTask(runnable); 
    } 
} 
+1

Genial, tnx, esto es más o menos lo que quería. Con el ajuste newTasks con runnable que ejecuta una tarea después de finalizar, solo un hilo de cada 20 podría estar inactivo mientras tenemos tareas en la cola secundaria. – Sarmun

1

Los javadocs para ThreadPoolExecutor son bastante claros, una vez que se han creado los hilos corePoolSize, los hilos nuevos solo se crearán una vez que la cola esté llena. Por lo tanto, si establece core en 5 y max en 20, nunca obtendrá el comportamiento deseado.

Sin embargo, si configura core y max en 20, las tareas solo se agregarán a la cola si los 20 hilos están ocupados. Por supuesto, esto hace que el requisito de "5 hilos mínimo" no tenga sentido, ya que los 20 se mantendrán vivos (hasta que dejen de funcionar, de todos modos).

+0

"inactivo" nunca sucederá, a menos que diga que todos los hilos centrales pueden morir. En cualquier caso, no vea el punto de tamaño central y máximo si no se puede usar correctamente. ¿Hay alguna otra clase (aparte de ThreadPoolExecutor) que cumpla con mis requisitos? – Sarmun

1

creo que este problema es un defecto de la clase y muy engañosa dada las combinaciones de parámetros de constructor. Aquí hay una solución tomada desde el ThreadPoolExecutor interno de SwingWorker que convertí en una clase de nivel superior. No tiene un mínimo, pero al menos usa un límite superior. Lo único que no sé es qué rendimiento obtiene al ejecutar el bloqueo.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { 
    private final ReentrantLock pauseLock = new ReentrantLock(); 
    private final Condition unpaused = pauseLock.newCondition(); 
    private boolean isPaused = false; 
    private final ReentrantLock executeLock = new ReentrantLock(); 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       handler); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, 
      RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory, handler); 
    } 

    @Override 
    public void execute(Runnable command) { 
     executeLock.lock(); 
     try { 
      pauseLock.lock(); 
      try { 
       isPaused = true; 
      } finally { 
       pauseLock.unlock(); 
      } 
      setCorePoolSize(getMaximumPoolSize()); 
      super.execute(command); 
      setCorePoolSize(0); 
      pauseLock.lock(); 
      try { 
       isPaused = false; 
       unpaused.signalAll(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } finally { 
      executeLock.unlock(); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     pauseLock.lock(); 
     try { 
      while (isPaused) { 
       unpaused.await(); 
      } 
     } catch (InterruptedException ignore) { 

     } finally { 
      pauseLock.unlock(); 
     } 
    } 
} 
Cuestiones relacionadas