2010-06-17 17 views
9

Estoy trabajando en alguna aplicación y estoy usando ThreadPoolExecutor para manejar varias tareas. ThreadPoolExecutor se estanca después de un tiempo. Para simular esto en un entorno más simple, he escrito un código simple donde puedo simular el problema.Java ThreadPoolExecutor atorado al usar ArrayBlockingQueue

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.RejectedExecutionHandler; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class MyThreadPoolExecutor { 
    private int poolSize = 10; 
    private int maxPoolSize = 50; 
    private long keepAliveTime = 10; 
    private ThreadPoolExecutor threadPool = null; 
    private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
      100000); 

    public MyThreadPoolExecutor() { 
     threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize, 
       keepAliveTime, TimeUnit.SECONDS, queue); 
     threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 

      @Override 
      public void rejectedExecution(Runnable runnable, 
        ThreadPoolExecutor threadPoolExecutor) { 
       System.out 
         .println("Execution rejected. Please try restarting the application."); 
      } 

     }); 
    } 

    public void runTask(Runnable task) { 
     threadPool.execute(task); 
    } 

    public void shutDown() { 
     threadPool.shutdownNow(); 
    } 
    public ThreadPoolExecutor getThreadPool() { 
     return threadPool; 
    } 

    public void setThreadPool(ThreadPoolExecutor threadPool) { 
     this.threadPool = threadPool; 
    } 

    public static void main(String[] args) { 
     MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor(); 
     for (int i = 0; i < 1000; i++) { 
      final int j = i; 
      mtpe.runTask(new Runnable() { 

       @Override 
       public void run() { 
        System.out.println(j); 
       } 

      }); 
     } 
    } 
} 

Intente ejecutar este código varias veces. Normalmente imprime el número en la consola y cuando todos los hilos terminan, existe. Pero a veces, terminó todas las tareas y luego no se termina. El vertedero de hilo es el siguiente:

MyThreadPoolExecutor [Java Application] 
    MyThreadPoolExecutor at localhost:2619 (Suspended) 
     Daemon System Thread [Attach Listener] (Suspended) 
     Daemon System Thread [Signal Dispatcher] (Suspended)  
     Daemon System Thread [Finalizer] (Suspended)  
      Object.wait(long) line: not available [native method] 
      ReferenceQueue<T>.remove(long) line: not available  
      ReferenceQueue<T>.remove() line: not available  
      Finalizer$FinalizerThread.run() line: not available 
     Daemon System Thread [Reference Handler] (Suspended)  
      Object.wait(long) line: not available [native method] 
      Reference$Lock(Object).wait() line: 485 
      Reference$ReferenceHandler.run() line: not available  
     Thread [pool-1-thread-1] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-2] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-3] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-4] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-6] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-8] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-5] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-10] (Suspended) 
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-9] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [pool-1-thread-7] (Suspended)  
      Unsafe.park(boolean, long) line: not available [native method] 
      LockSupport.park(Object) line: not available  
      AbstractQueuedSynchronizer$ConditionObject.await() line: not available 
      ArrayBlockingQueue<E>.take() line: not available 
      ThreadPoolExecutor.getTask() line: not available  
      ThreadPoolExecutor$Worker.run() line: not available 
      Thread.run() line: not available  
     Thread [DestroyJavaVM] (Suspended) 
    C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM) 

En mi aplicación real, hilos ThreadPoolExecutor ir en este estado y luego deja de responder.

Saludos, Ravi Rao

+0

Una idea es que pruebes ExecutorService, uno de mis favoritos. –

+0

@Lars Andren, A ThreadPoolExecutor es un ExecutorService. ExecutorService es simplemente una interfaz. En la biblioteca Java 1.5, ThreadPoolExecutor es la única implementación directa de la interfaz ExecutorService. Hay un AbstractExecutorService y un DelegatedExecutorService que no son clases funcionales independientes. Además, hay una interfaz SheceduledExecutorService que extiende ExecutorService y tiene una única implementación concreta. –

Respuesta

9

En el método de main, nunca se llaman mtpe.shutdown(). A ThreadPoolExecutor intentará mantener su corePoolSize con vida indefinidamente. A veces, tienes suerte y tienes más de corePoolSize hilos vivos, por lo que cada hilo de trabajo entrará en una rama de lógica condicional que le permite terminar después de su período de tiempo de espera especificado de 10 segundos. Sin embargo, como habrás notado, a veces este no es el caso, por lo que cada subproceso en el ejecutor se bloqueará en ArrayBlockingQueue.take() y esperará una nueva tarea.

Además, tenga en cuenta que hay una diferencia significativa entre ExecutorService.shutdown() y ExecutorService.shutdownNow(). Si llama a ExecutorService.shutdownNow() como lo indica la implementación de su envoltorio, ocasionalmente abandonará algunas tareas que no se han asignado para su ejecución.

Actualización: Desde mi respuesta original, la implementación ThreadPoolExecutor ha cambiado de modo que el programa en la publicación original nunca debería salir.

Cuestiones relacionadas