2012-03-28 9 views
9

He estado jugando con diferentes estrategias para la agrupación de subprocesos utilizando ThreadPoolExecutor con JDK6. Tengo una cola de Prioridad funcionando, pero no estaba seguro de si me gustó cómo el grupo no se dimensionó después de keepAliveTime (lo que obtienes con una cola ilimitada). Entonces, estoy mirando un ThreadPoolExecutor usando LinkedBlockingQueue y la política CallerRuns.¿Por qué ThreadPoolExecutor reduce los subprocesos por debajo de corePoolSize después de keepAliveTime?

El problema que estoy teniendo ahora es que el grupo aumenta, ya que los documentos explican que debería hacerlo, pero después de que las tareas se completan y el keepAliveTime entra en juego getPoolSize muestra que el grupo se reduce a cero. a continuación el código de ejemplo debe permitir que vea la base de mi pregunta: ¿

public class ThreadPoolingDemo { 
    private final static Logger LOGGER = 
     Logger.getLogger(ThreadPoolingDemo.class.getName()); 

    public static void main(String[] args) throws Exception { 
     LOGGER.info("MAIN THREAD:starting"); 
     runCallerTestPlain(); 
    } 

    private static void runCallerTestPlain() throws InterruptedException { 
     //10 core threads, 
     //50 max pool size, 
     //100 tasks in queue, 
     //at max pool and full queue - caller runs task 
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50, 
      5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.CallerRunsPolicy()); 

     //dump 5000 tasks on the queue 
     for (int i = 0; i < 5000; i++) { 
      tpe.submit(new Runnable() { 
       @Override 
       public void run() { 
        //just to eat some time and give a little feedback 
        for (int j = 0; j < 20; j++) { 
         LOGGER.info("First-batch Task, looping:" + j + "[" 
           + Thread.currentThread().getId() + "]"); 
        } 
       } 
      }, null); 
     } 
     LOGGER.info("MAIN THREAD:!!Done queueing!!"); 

     //check tpe statistics forever 
     while (true) { 
      LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: " 
       + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize()); 
      Thread.sleep(1000); 
     } 
    } 
} 

me encontré con un viejo error que parece ser este tema pero estaba cerrado: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. ¿Podría estar presente en 1.6 o me falta algo?

Parece que I Rubber Ducked este (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). El error que he vinculado anteriormente está relacionado con este: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792, donde el problema parece resolverse en 1.7 (cargué 1.7 y verifiqué - solucioné ...). Creo que mi principal problema fue que un error tan fundamental se mantuvo durante casi una década. Pasé demasiado tiempo escribiendo esto para no publicarlo ahora, espero que ayude a alguien.

+0

+1 Un buen descubrimiento, me sorprendí a mí mismo al ver este comportamiento. –

+1

Quizás sería mejor estructurar tu publicación como una pregunta y luego proporcionar lo que has aprendido como respuesta. –

Respuesta

6

... después de las tareas completa y la KeepAliveTime entra en juego getPoolSize muestra la piscina consiguiendo reduce a cero.

Así que esto parece ser una condición de carrera en el ThreadPoolExecutor. Supongo que está funcionando de acuerdo con el diseño, aunque no esperado. En el método getTask() el que el bucle de subprocesos de trabajo a través de llegar tareas de la cola de bloqueo, se ve este código:

if (state == SHUTDOWN) // Help drain queue 
    r = workQueue.poll(); 
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
else 
    r = workQueue.take(); 
if (r != null) 
    return r; 
if (workerCanExit()) { 
    if (runState >= SHUTDOWN) // Wake up others 
     interruptIdleWorkers(); 
    return null; 
} 

Si el poolSize crece por encima de la corePoolSize continuación, si los tiempos de la encuesta de espera después de keepAliveTime, el código se cae a workerCanExit() desde r es null. Todos los hilos pueden volver true de ese método, ya que se acaba poniendo a prueba el estado de poolSize:

mainLock.lock(); 
    boolean canExit; 
    try { 
     canExit = runState >= STOP || 
      workQueue.isEmpty() || 
      (allowCoreThreadTimeOut && 
      poolSize > Math.max(1, corePoolSize)); << test poolSize here 
    } finally { 
     mainLock.unlock();       << race to workerDone() begins 
    } 

Una vez que devuelve true entonces las salidas de los subprocesos de trabajo y luego se disminuye la poolSize. Si todos los subprocesos de trabajo hacen esa prueba al mismo tiempo, todos saldrán debido a la competencia entre la prueba de poolSize y la detención del trabajador cuando se produce --poolSize.

Lo que me sorprende es cuán consistente es esa condición de carrera. Si agrega un poco de aleatoriedad al sleep() dentro del run() debajo, entonces puede obtener algunos hilos principales para no salir, pero hubiera pensado que la condición de carrera hubiera sido más difícil de alcanzar.


Puede ver este comportamiento en el siguiente ensayo:

@Test 
public void test() throws Exception { 
    int before = Thread.activeCount(); 
    int core = 10; 
    int max = 50; 
    int queueSize = 100; 
    ThreadPoolExecutor tpe = 
      new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(queueSize), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
    tpe.allowCoreThreadTimeOut(false); 
    assertEquals(0, tpe.getActiveCount()); 
    // if we start 1 more than can go into core or queue, poolSize goes to 0 
    int startN = core + queueSize + 1; 
    // if we only start jobs the core can take care of, then it won't go to 0 
    // int startN = core + queueSize; 
    for (int i = 0; i < startN; i++) { 
     tpe.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 
    while (true) { 
     System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize() 
       + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before)); 
     Thread.sleep(1000); 
    } 
} 

Si cambia la línea sleep dentro del método run() a algo como esto:

private final Random random = new Random(); 
... 
    Thread.sleep(100 + random.nextInt(100)); 

Esto hará la condición de carrera es más difícil de alcanzar, por lo que algunos hilos centrales seguirán existiendo.

+0

El OP está más interesado en 'getPoolSize()' y no 'getActiveCount()' Después de completar todas las tareas y el keepAliveTime expira, todos los hilos terminan aunque el TPE no lo haga. –

+0

@Gray mis parámetros de TPE son importantes. Esto no siempre falla. Usando los parámetros que usó, lo veremos funcionar como se espera. Los parámetros que especifiqué juegan con las reglas de TPE de una manera única pero también realista (es decir, un gran trabajo por lotes que se descarga en la cola). El principal problema con sus parámetros es que nunca desencadena la situación que hace que los subprocesos se agreguen al grupo por encima y más allá del tamaño del núcleo. Para superar/agregar más allá del tamaño del núcleo, el grupo debe estar en el tamaño del núcleo (10) y la cola debe haber superado su límite (100). Pruebe su ejemplo con 'for (int i = 0; i andematt

+0

@andematt Viaje interesante. Esta es una condición de carrera en 'ThreadPoolExecutor'. He editado mi respuesta. – Gray

Cuestiones relacionadas