2012-04-02 23 views
6

Tengo varios trabajadores que usan ArrayBlockingQueue.Java: Producer = Consumer, ¿cómo saber cuándo parar?

Cada trabajador toma un objeto de la cola, lo procesa y, como resultado, puede obtener varios objetos, que se colocarán en cola para su posterior procesamiento. Entonces, trabajador = productor + consumidor.

Trabajador:

public class Worker implements Runnable 
{ 
    private BlockingQueue<String> processQueue = null; 

    public Worker(BlockingQueue<String> processQueue) 
    { 
     this.processQueue = processQueue; 
    } 

    public void run() 
    { 
     try 
     { 
      do 
      { 
       String item = this.processQueue.take(); 
       ArrayList<String> resultItems = this.processItem(item); 

       for(String resultItem : resultItems) 
       { 
        this.processQueue.put(resultItem); 
       } 
      } 
      while(true); 
     } 
     catch(Exception) 
     { 
      ... 
     } 
    } 

    private ArrayList<String> processItem(String item) throws Exception 
    { 
     ... 
    } 
} 

principal:

public class Test 
{ 
    public static void main(String[] args) throws Exception 
    { 
     new Test().run(); 
    } 

    private void run() throws Exception 
    { 
     BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000); 
     processQueue.put("lalala"); 

     Executor service = Executors.newFixedThreadPool(100); 
     for(int i=0; i<100; ++i) 
     { 
      service.execute(new Worker(processQueue)); 
     } 
    } 
} 

¿Cuál es la mejor manera de evitar que los trabajadores, cuando no hay más trabajo es?

En primer lugar, lo que tengo en mente, es comprobar periódicamente cuántos elementos en cola y cuántos elementos están actualmente en proceso. Si ambos son iguales a cero, haga algo como "shutdownNow()" en ExecutorService. Pero no estoy seguro de que esta sea la mejor manera.

+0

La pregunta más interesante es cuál será su protección contra los patrones cíclicos, obviamente la cola puede reproducirse en algunas circunstancias. – Osw

+0

¿Podría aclarar el caso de uso? ¿Desea que la aplicación se termine sola cuando la cola está vacía o que cuando se realiza una solicitud a la aplicación para que se cierre, lo hará solo después de que la cola esté vacía? – Handerson

+0

@Osw, ¿qué quiere decir con "patrón cíclico" aquí? –

Respuesta

2

Si no hay más trabajo que hacer, coloque un mensaje en la fila de espera y dígale a los trabajadores que se cierren a su conveniencia. Esta es una buena forma de evitar la corrupción de datos.

Si necesita notificar a otro hilo que todos los trabajadores se han ido a su casa, puede usar un CountDownLatch para hacerlo.

+1

No puedo usar "Poison Pill" porque no tengo un productor, eso lo sé, cuando ya no tiene más trabajo para producir. Acerca de CountDownLatch: para hacerlo, todo trabajador debe saber, cuando ya no hay más trabajo :) –

1

Parece que tiene su solución: use una fila en progreso, cuyo tamaño será la cantidad de elementos que se están procesando actualmente. Si usa la convención de que los accesos a cualquiera de las colas están en los bloques synchronized(theArrayBlockingQueue), entonces todos deberían estar bien. En particular, cuando mueva un elemento al estado de procesamiento, elimínelo de ArrayBlockingQueue y añádalo a la línea de procesamiento dentro del mismo bloque sincronizado .

+0

Gracias por responder :) ¿De verdad necesito "processingQueue", o AtomicInteger (que representa, cuántos elementos están actualmente en proceso?) seria suficiente ? Puede ser que no sé algo. –

+0

Me gusta tener 'estado' representado en estructuras de datos en lugar de variables locales de hilos.Por ejemplo, podría como hilo conductor, sincronizar en ArrayBlockingQueue, matar a todos los trabajadores, devolver los elementos de la cola de trabajo en progreso a ArrayBlockingQueue y luego reiniciar los trabajadores. Mucho más difícil de hacer si tiene que comunicarse con hilos para recuperar su estado. – karmakaze

0

He modificado ligeramente su código, no estoy seguro si eso es lo que espera, ¡pero al menos termina! Si usa shutdownNow en lugar de shutdown, sus trabajadores serán interrumpidos y, a menos que los vuelva a poner a trabajar, saldrán sin garantía de que la cola esté vacía.

public class Test { 

    public static void main(String[] args) throws Exception { 
     new Test().run(); 
    } 

    private void run() throws Exception { 
     BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000); 
     processQueue.put("lalalalalalalalalalalalala"); //a little longer to make sure there is enough to process 

     ExecutorService service = Executors.newFixedThreadPool(100); 
     for (int i = 0; i < 100; ++i) { 
      service.execute(new Worker(processQueue)); 
     } 
     service.shutdown(); //orderly shutdown = lets the tasks terminate what they are doing 
     service.awaitTermination(1, TimeUnit.SECONDS); //blocks until all tasks have finished or throws TimeOutException if timeout is reached 
    } 

    public static class Worker implements Runnable { 

     private BlockingQueue<String> processQueue = null; 
     private int count = 0; 

     public Worker(BlockingQueue<String> processQueue) { 
      this.processQueue = processQueue; 
     } 

     @Override 
     public void run() { 
      try { 
       do { 
        //tries to get something from the queue for 100ms and returns null if it could not get anything 
        String item = this.processQueue.poll(100, TimeUnit.MILLISECONDS); 
        if (item == null) break; //Ends the job because the queue was empty 
        count++; 
        List<String> resultItems = this.processItem(item); 

        for (String resultItem : resultItems) { 
         this.processQueue.put(resultItem); 
        } 
       } while (true); 
      } catch (InterruptedException e) { 
       System.out.println("Interrupted"); 
       Thread.currentThread().interrupt(); 
      } 
      if (count != 0) System.out.println(Thread.currentThread() + ": processed " + count + " entries"); 
     } 

     private List<String> processItem(String item) { //let's put the string back less final character 
      if (item.isEmpty()) { 
       return Collections.<String> emptyList(); 
      } else { 
       return Arrays.asList(item.substring(0, item.length() - 1)); 
      } 
     } 
    } 
} 
Cuestiones relacionadas