2009-09-08 20 views
13

Encontró una situación cuando ThreadPoolExecutor está estacionado en la función execute(Runnable) mientras que todos los ThreadPool hilos están esperando en getTask func, workQueue is empty.Punto muerto en ThreadPoolExecutor

¿Alguien tiene alguna idea?

El ThreadPoolExecutor se crea con ArrayBlockingQueue, y corePoolSize == maximumPoolSize = 4

[Editar] Para ser más precisos, el hilo se bloquea en ThreadPoolExecutor.exec(Runnable command) func. Tiene la tarea de ejecutar, pero no lo hace.

[Editar2] El ejecutor está bloqueado en algún lugar dentro de la cola de trabajo (ArrayBlockingQueue).

[Edit3] La pila de llamadas:

thread = front_end(224) 
at sun.misc.Unsafe.park(Native methord) 
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778) 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114) 
at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186) 
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262) 
at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224) 
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653) 
at net.listenThread.WorkersPool.execute(WorkersPool.java:45) 

al mismo tiempo que el WorkQueue está vacía (comprobado mediante depuración remota)

[Edit4] Código trabajar con ThreadPoolExecutor:

public WorkersPool(int size) { 
    pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY), 
     new ThreadFactory() { 
     @NotNull 
     private final AtomicInteger threadsCount = new AtomicInteger(0); 

     @NotNull 
     public Thread newThread(@NotNull Runnable r) { 
      final Thread thread = new Thread(r); 
      thread.setName("net_worker_" + threadsCount.incrementAndGet()); 
      return thread; 
     } 
     }, 

     new RejectedExecutionHandler() { 
     public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) { 
      Verify.warning("new task " + r + " is discarded"); 
     } 
     }); 
    } 

    public void execute(@NotNull Runnable task) { 
    pool.execute(task); 
    } 

    public void stopWorkers() throws WorkersTerminationFailedException { 
    pool.shutdownNow(); 
    try { 
     pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS); 
    } catch (InterruptedException e) { 
     throw new WorkersTerminationFailedException("Workers-pool termination failed", e); 
    } 
    } 
} 
+0

¿Cuál es la naturaleza de la tarea que se pasa a TPE.execute() func.? Si la tarea tiene acceso a TPE, este podría ser su problema. – artemv

+1

Creo que tengo un problema comparable el 1.7.0_13.El proceso comienza y funciona sin problemas ... y luego en algún momento tengo ~~ 200 tareas pero mi cola de bloqueo está vacía. El tamaño del grupo principal es 3 ... Estoy usando ArrayBlockingQueue también ... – cljk

Respuesta

2

No veo ningún bloqueo en el código de ThreadPoolExecutor 's execute(Runnable). La única variable es el workQueue. ¿Qué tipo de BlockingQueue proporcionó a su ThreadPoolExecutor?

Sobre el tema de puntos muertos:

Puede confirmar esto es un callejón sin salida examinando el volcado de rosca completa, según lo dispuesto por <ctrl><break> en Windows o kill -QUIT en los sistemas UNIX.

Una vez que tenga esos datos, puede examinar los hilos. Aquí es un extracto pertinente de Sun's article on examining thread dumps (suggested reading):

para colgar, en punto muerto o programas congelados: Si cree que su programa está colgando, generar un seguimiento de pila y examinar los hilos en los estados MW o CW. Si el programa está bloqueado, es probable que algunos de los hilos del sistema aparezcan como los hilos actuales, porque no hay nada más que pueda hacer la JVM.

En una nota más clara: si está ejecutando en un IDE, puede asegurarse de que no hay puntos de interrupción habilitados en estos métodos.

+0

Como escribí en mi pregunta, se usa ArrayBlockingQueue. Y está vacío. Sí, el hilo está bloqueando en algún lugar de la cola de trabajo. – Vitaly

+0

Utilicé depuración remota. Se editó la pregunta: se agregó la pila de llamadas. – Vitaly

+0

También puede verificar si hay interbloqueos con JConsole – pjp

0

Como alguien ya lo mencionó, esto suena como un comportamiento normal, el ThreadPoolExecutor solo está esperando para hacer algo de trabajo. Si desea detenerlo, es necesario llamar a:

executor.shutdown()

conseguirlo para terminar, por lo general seguido de un ejecutor.awaitTermination

+0

He editado la pregunta – Vitaly

0

La biblioteca de código fuente está por debajo (que es de hecho una clase de http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip),
- un poco complicado - una protección adicional contra las repetidas llamadas de FutureTask si no me equivoco -, pero no parece ser propensos estancamiento - Uso muy simple de ThreadPool:

package net.spy.memcached.transcoders; 

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.Future; 
import java.util.concurrent.FutureTask; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.TimeoutException; 
import java.util.concurrent.atomic.AtomicBoolean; 

import net.spy.memcached.CachedData; 
import net.spy.memcached.compat.SpyObject; 

/** 
* Asynchronous transcoder. 
*/ 
public class TranscodeService extends SpyObject { 

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L, 
      TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.DiscardPolicy()); 

    /** 
    * Perform a decode. 
    */ 
    public <T> Future<T> decode(final Transcoder<T> tc, 
      final CachedData cachedData) { 

     assert !pool.isShutdown() : "Pool has already shut down."; 

     TranscodeService.Task<T> task = new TranscodeService.Task<T>(
       new Callable<T>() { 
        public T call() { 
         return tc.decode(cachedData); 
        } 
       }); 

     if (tc.asyncDecode(cachedData)) { 
      this.pool.execute(task); 
     } 
     return task; 
    } 

    /** 
    * Shut down the pool. 
    */ 
    public void shutdown() { 
     pool.shutdown(); 
    } 

    /** 
    * Ask whether this service has been shut down. 
    */ 
    public boolean isShutdown() { 
     return pool.isShutdown(); 
    } 

    private static class Task<T> extends FutureTask<T> { 
     private final AtomicBoolean isRunning = new AtomicBoolean(false); 

     public Task(Callable<T> callable) { 
      super(callable); 
     } 

     @Override 
     public T get() throws InterruptedException, ExecutionException { 
      this.run(); 
      return super.get(); 
     } 

     @Override 
     public T get(long timeout, TimeUnit unit) throws InterruptedException, 
       ExecutionException, TimeoutException { 
      this.run(); 
      return super.get(timeout, unit); 
     } 

     @Override 
     public void run() { 
      if (this.isRunning.compareAndSet(false, true)) { 
       super.run(); 
      } 
     } 
    } 

} 
0

Definitivamente extraño.

Pero antes de escribir su propio intento TPE:.

  • otra BlockingQueue impl, por ejemplo, LinkedBlockingQueue

  • especificar la imparcialidad = true en ArrayBlockingQueue, es decir, usar new ArrayBlockingQueue(n, true)

A partir de esos dos opta volvería a elegir segundo porque es muy extraño que offer() siendo bloqueada; Una razón que viene a la mente: la política de programación de hilos en su Linux. Solo como una suposición.

7

Parece que es un error con una JVM anterior a 6u21. Hubo un problema en el código nativo compilado para algunos sistemas operativos (quizás todos).

Desde el enlace:

El error es causado por la falta de barreras de memoria con distintos Parker :: parque() caminos que pueden dar lugar a activaciones y cuelga perdidos. (Tenga en cuenta que PlatformEvent :: park utilizado por la sincronización incorporada no es vulnerable al problema). -XX: + UseMembar constituye una solución alternativa porque la barrera membar en la lógica de transición de estado oculta el problema en Parker ::. (es decir, no hay nada de malo con el mecanismo de uso -UseMembar , pero + UseMembar oculta el error Parker: :). Este es un error del día uno introducido con la adición de java.util.concurrent en JDK 5.0. Desarrollé un modo C simple de la falla y parece más probable que se manifieste en las plataformas AMD y Nehalem modernas, probablemente debido a los buffers de la tienda más profundos que tardan más en drenar. Proporcioné una solución provisional a Doug Lea para Parker :: park que parece eliminar el error. Voy a entregando esta corrección al tiempo de ejecución. (También aumentaré el CR con casos de prueba adicionales y una explicación más larga). Es probable que sea un buen candidato para back-ports.

Enlace: JVM Bug

Soluciones provisionales están disponibles, pero probablemente sería mejor fuera sólo conseguir la copia más reciente de Java.

+1

Actualicé a 'build 1.6.0_27-b07' (ejecutándose en Solaris 10 SPARC), pero aún no resuelve el problema. My Jboss ESB sigue creando miles de hilos y no los cierra. –

1

Este punto muerto probablemente se deba a que ejecuta la tarea desde el ejecutor. Por ejemplo, envía una tarea y esta dispara otras 4 tareas. Si el tamaño de la agrupación es igual a 4, se desbordará por completo y la última tarea esperará hasta que alguien de la tarea devuelva el valor. Pero la primera tarea es esperar a que se completen todas las tareas bifurcadas.

Cuestiones relacionadas