2012-05-26 19 views
5

empecé a leer más sobre ThreadPoolExecutor desde Java Doc como lo estoy usando en una de mi proyecto. Entonces, ¿alguien puede explicarme qué significa realmente esta línea? Sé lo que significa cada parámetro, pero quería entenderlo de una manera más general/laica de algunos de los expertos aquí.ThreadPoolExecutor con ArrayBlockingQueue

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L, 
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy()); 

Actualizado: - Planteamiento del problema es: -

Cada hilo utiliza el ID único entre 1 y 1000 y el programa tiene que correr durante 60 minutos o más, así que en ese 60 minutos es posible que todos los ID se terminen, así que necesito reutilizar esos ID nuevamente. Así que este es el programa a continuación que escribí usando el ejecutor anterior.

class IdPool { 
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>(); 

    public IdPool() { 
     for (int i = 1; i <= 1000; i++) { 
      availableExistingIds.add(i); 
     } 
    } 

    public synchronized Integer getExistingId() { 
     return availableExistingIds.removeFirst(); 
    } 

    public synchronized void releaseExistingId(Integer id) { 
     availableExistingIds.add(id); 
    } 
} 


class ThreadNewTask implements Runnable { 
    private IdPool idPool; 

    public ThreadNewTask(IdPool idPool) { 
     this.idPool = idPool; 
    } 

    public void run() { 
     Integer id = idPool.getExistingId(); 
     someMethod(id); 
     idPool.releaseExistingId(id); 
    } 

// This method needs to be synchronized or not? 
    private synchronized void someMethod(Integer id) { 
     System.out.println("Task: " +id); 
// and do other calcuations whatever you need to do in your program 
    } 
} 

public class TestingPool { 
    public static void main(String[] args) throws InterruptedException { 
     int size = 10; 
     int durationOfRun = 60; 
     IdPool idPool = new IdPool(); 
     // create thread pool with given size 
     ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

     // queue some tasks 
     long startTime = System.currentTimeMillis(); 
     long endTime = startTime + (durationOfRun * 60 * 1000L); 

     // Running it for 60 minutes 
     while(System.currentTimeMillis() <= endTime) { 
      service.submit(new ThreadNewTask(idPool)); 
     } 

     // wait for termination   
     service.shutdown(); 
     service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    } 
} 

Mi pregunta es: - Este código es correcto por lo que el rendimiento se considera o no? ¿Y qué más puedo hacer aquí para hacerlo más preciso? Cualquier ayuda será apreciada.

+0

No veo nada malo en su código, con excepción quizá de tener un try/finally en su método de "correr" para asegurarse de que ID siempre ha ser liberado (para cuando usted tiene un código más complicado en "algunMetodo "). – Matt

+0

@Matt, gracias por comentar, ¿entonces algúnMétodo tiene que estar sincronizado? ¿o no? Como en mi caso, lo hice sincronizado. – ferhan

+0

Y 'idPool.releaseExistingId (id);' debería aparecer en el bloque finally ¿no? – ferhan

Respuesta

8

[En primer lugar, me disculpo, esta es una respuesta a una pregunta anterior, pero quería dar formato].

Excepto en realidad, no es un elemento de bloque cuando se somete a un ThreadPoolExecutor con una cola llena. La razón de esto es que ThreadPoolExecutor llama al método BlockingQueue.offer (T item) que, por definición, es un método no bloqueante. Agrega el elemento y devuelve verdadero, o no agrega (cuando está lleno) y devuelve falso. El ThreadPoolExecutor llama al RejectedExecutionHandler registrado para tratar esta situación.

Desde el javadoc:

ejecuta la tarea dada en algún momento en el futuro. La tarea puede ejecutar en un nuevo hilo o en un hilo agrupado existente. Si la tarea no se puede presentado para su ejecución, ya sea porque esta ejecutor ha sido apagado o porque se ha alcanzado su capacidad, la tarea es manejado por el RejectedExecutionHandler actual.

Por defecto, el ThreadPoolExecutor.AbortPolicy() se utiliza lo que arroja un RejectedExecutionException del método de "enviar" o "ejecutar" de la ThreadPoolExecutor.

try { 
    executorService.execute(new Runnable() { ... }); 
} 
catch (RejectedExecutionException e) { 
    // the queue is full, and you're using the AbortPolicy as the 
    // RejectedExecutionHandler 
} 

Sin embargo, puede utilizar otros manipuladores de hacer algo diferente, como ignorar el error (DiscardPolicy), o bien puede hacerlo en el hilo que llama el método de "ejecutar" o "enviar" (CallerRunsPolicy). Este ejemplo permite que el hilo que llame al método "enviar" o "ejecutar" ejecute la tarea solicitada cuando la cola está llena. (Esto significa que en un momento dado, podría 1 cosa adicional que se ejecuta en la parte superior de lo que hay en la propia piscina):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy()); 

Si desea bloquear y esperar, se podría implementar su propio RejectedExecutionHandler que bloquearía hasta que haya una ranura disponible en la cola (esto es una estimación aproximada, no he compilado o ejecutar esto, pero usted debe tener una idea):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
    if (e.isTerminated() || e.isShutdown()) { 
     return; 
    } 

    boolean submitted = false; 
    while (! submitted) { 
     if (Thread.currentThread().isInterrupted()) { 
      // be a good citizen and do something nice if we were interrupted 
      // anywhere other than during the sleep method. 
     } 

     try { 
      e.execute(r); 
      submitted = true; 
     } 
     catch (RejectedExceptionException e) { 
     try { 
      // Sleep for a little bit, and try again. 
      Thread.sleep(100L); 
     } 
     catch (InterruptedException e) { 
      ; // do you care if someone called Thread.interrupt? 
      // if so, do something nice here, and maybe just silently return. 
     } 
     } 
    } 
    } 
} 
+0

No entiendo mucho de lo que acaba de decir, ya que soy nuevo en la familia Ejecutora. Entonces, ¿cómo estoy en mi pregunta es correcto o no? O puede haber alguna mejora en el código? – ferhan

+0

Simplemente estaba comentando sobre la afirmación de alguien de que "Utiliza un ArrayBlockingQueue para administrar las solicitudes de ejecución con 10 espacios, por lo que cuando la cola está llena (después de que 10 hilos han sido en cola), bloqueará a la persona que llama". Este no es el caso, como he explicado anteriormente, a menos que deliberadamente hagas algún código para que esto suceda. Al utilizar la política de ejecución de llamadas, la ejecuta en el hilo actual, esta afirmación anterior implica que bloqueará hasta que haya espacio en la cola. – Matt

2

Se trata de crear un ExecutorService que se ocupa de la ejecución de un grupo de subprocesos. Tanto el número inicial como el máximo de subprocesos en el grupo son 10 en este caso. Cuando un hilo en el grupo queda inactivo durante 1 segundo (1000ms), lo mata (el temporizador inactivo), sin embargo, dado que el número máximo y central de hilos es el mismo, esto nunca sucederá (siempre mantiene 10 hilos alrededor y lo hará). nunca ejecute más de 10 hilos).

Utiliza un ArrayBlockingQueue para gestionar las solicitudes de ejecución con 10 ranuras, por lo que cuando la cola está llena (después de que 10 hilos hayan sido en cola), bloqueará a la persona que llama.

Si se rechaza el hilo (que en este caso sería debido al cierre del servicio, ya que los hilos se pondrán en cola o se bloqueará al poner en cola un hilo si la cola está llena), se ejecutará el Runnable en el hilo de la persona que llama

+0

Gracias por el comentario, y ¿qué significa eso verdadero en ArrayBlockingQueue? – ferhan

+0

Tiene un número fijo de ranuras respaldadas por una matriz y bloqueará a la persona que llama al encuestar si están llenas. Hay otros tipos de colas que se pueden usar. Mira la descripción de esto en el Javadoc para más información. –

+0

Actualicé mi pregunta en la que publiqué mi código, y también mencioné el enunciado de mi problema, ¿hay algún problema en ese código? Si es así, ¿cómo puedo hacerlo más preciso? Cualquier ayuda será apreciada. – ferhan

0

Considere semáforos. Estos son para el mismo propósito. Por favor, verifique a continuación el código con semáforo. No estoy seguro de si esto es lo que quieres. Pero esto se bloqueará si no hay más permisos para adquirir. También es ID importante para ti?

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Semaphore; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

class ThreadNewTask implements Runnable { 
    private Semaphore idPool; 

    public ThreadNewTask(Semaphore idPool) { 
     this.idPool = idPool; 
    } 

    public void run() { 
//  Integer id = idPool.getExistingId(); 
     try { 
      idPool.acquire(); 
      someMethod(0); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } finally { 
      idPool.release(); 
     } 
//  idPool.releaseExistingId(id); 
    } 

    // This method needs to be synchronized or not? 
    private void someMethod(Integer id) { 
     System.out.println("Task: " + id); 
     // and do other calcuations whatever you need to do in your program 
    } 
} 

public class TestingPool { 
    public static void main(String[] args) throws InterruptedException { 
     int size = 10; 
     int durationOfRun = 60; 
     Semaphore idPool = new Semaphore(100); 
//  IdPool idPool = new IdPool(); 
     // create thread pool with given size 
     ExecutorService service = new ThreadPoolExecutor(size, size, 500L, 
       TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), 
       new ThreadPoolExecutor.CallerRunsPolicy()); 

     // queue some tasks 
     long startTime = System.currentTimeMillis(); 
     long endTime = startTime + (durationOfRun * 60 * 1000L); 

     // Running it for 60 minutes 
     while (System.currentTimeMillis() <= endTime) { 
      service.submit(new ThreadNewTask(idPool)); 
     } 

     // wait for termination 
     service.shutdown(); 
     service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    } 
} 
+0

¿Me puede dar un ejemplo basado en mi declaración de problema? – ferhan

+0

Sí ID es importante para mí y necesito pasar el ID al método para que cada subproceso use un ID único diferente. – ferhan

+0

OK. Su enfoque se ve bien, entonces? – Ravi

Cuestiones relacionadas