2010-02-25 8 views
46

Me gustaría crear algún tipo de aplicación de subprocesamiento Producer/Consumer. Pero no estoy seguro de cuál es la mejor manera de implementar una cola entre los dos.Producer/Consumer threads using a Queue

Así que tengo algunas ideas (las cuales pueden ser totalmente erróneas). Me gustaría saber cuál sería mejor y si ambos apestan, ¿cuál sería la mejor forma de implementar la cola? Es principalmente mi implementación de la cola en estos ejemplos lo que me preocupa. Extiendo una clase Queue que es una clase interna y es segura para subprocesos. A continuación hay dos ejemplos con 4 clases cada uno.

principal de clase

public class SomeApp 
{ 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     consumer = new Consumer(); 
     producer = new Producer(); 
    } 
} 

Consumidor de clase

public class Consumer implements Runnable 
{ 
    public Consumer() 
    { 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = QueueHandler.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

Productor de clase

public class Producer implements Runnable 
{ 
    public Producer() 
    { 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //add to the queue some sort of unique object 
      QueueHandler.enqueue(new Object()); 
     } 
    } 
} 

cola de clase

public class QueueHandler 
{ 
    //This Queue class is a thread safe (written in house) class 
    public static Queue<Object> readQ = new Queue<Object>(100); 

    public static void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(object); 
    } 

    public static Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

O

principal de clase

public class SomeApp 
{ 
    Queue<Object> readQ; 
    private Consumer consumer; 
    private Producer producer; 

    public static void main (String args[]) 
    { 
     readQ = new Queue<Object>(100); 
     consumer = new Consumer(readQ); 
     producer = new Producer(readQ); 
    } 
} 

Consumidor de clase

public class Consumer implements Runnable 
{ 
    Queue<Object> queue; 

    public Consumer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread consumer = new Thread(this); 
     consumer.start(); 
    } 

    public void run() 
    { 
     while(true) 
     { 
      //get an object off the queue 
      Object object = queue.dequeue(); 
      //do some stuff with the object 
     } 
    } 
} 

Productor de clase

public class Producer implements Runnable 
{ 
    Queue<Object> queue; 

    public Producer(Queue<Object> readQ) 
    { 
     queue = readQ; 
     Thread producer = new Thread(this); 
     producer.start(); 
    } 

    public void run() 
    { 

     while(true) 
     { 
      //add to the queue some sort of unique object 
      queue.enqueue(new Object()); 
     } 
    } 
} 

cola de clase

//the extended Queue class is a thread safe (written in house) class 
public class QueueHandler extends Queue<Object> 
{  
    public QueueHandler(int size) 
    { 
     super(size); //All I'm thinking about now is McDonalds. 
    } 

    public void enqueue(Object object) 
    { 
     //do some stuff 
     readQ.add(); 
    } 

    public Object dequeue() 
    { 
     //do some stuff 
     return readQ.get(); 
    } 
} 

¡Y listo!

+0

Productores enqueue y Consumer dequeue, btw. No al revés ... –

+1

Ah, y no comiences los hilos de un constructor !! Ese hilo podría observar el objeto en un estado inconsistente. Consulte "Concurrencia de Java en la práctica" para más detalles. –

+0

Gracias Zwei, lo que entré fue que yo estaba desenfocado. el inicio de la tarea Subproceso del constructor ¿debería ejecutar un método de intiliazation y comenzar allí o debería iniciarlo desde la clase de método principal? – Gareth

Respuesta

69

Java 5+ tiene todas las herramientas que necesita para este tipo de cosas. Usted querrá:

  1. Ponga todos sus productores en uno ExecutorService;
  2. Ponga a todos sus consumidores en otro ExecutorService;
  3. Si es necesario, comuníquese entre los dos usando un BlockingQueue.

Di "si es necesario" para (3) porque desde mi experiencia es un paso innecesario. Todo lo que hace es enviar nuevas tareas al servicio del ejecutor del consumidor.Por lo tanto:

final ExecutorService producers = Executors.newFixedThreadPool(100); 
final ExecutorService consumers = Executors.newFixedThreadPool(100); 
while (/* has more work */) { 
    producers.submit(...); 
} 
producers.shutdown(); 
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 
consumers.shutdown(); 
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); 

Así que la producers enviar directamente a consumers.

+2

Cletus tiene razón para obtener más información que ayude a aclarar "por dónde empezar" http://java.sun.com/docs/books/tutorial/essential/concurrency/ – edwardsmatt

+0

"De modo que los productores envían directamente a los consumidores" - ¿Es seguro llamar a consumer.submit (...) en paralelo o debo sincronizarme con eso? – Gevorg

+0

Si comparte un BlockingQueue, ¿podría usar 1 ejecutor tanto para productores como para consumidores? – devo

9

Está reinventando la rueda.

Si necesita persistencia y otras funciones empresariales, use JMS (sugiero ActiveMq).

Si necesita colas rápidas en memoria, utilice una de las impedancias de Queue de java.

Si necesita admitir Java 1.4 o una versión anterior, use el excelente paquete concurrent de Doug Lea.

+4

Aún se le puede pedir que implemente Producer Consumer en una entrevista de trabajo :) –

+0

Encuentro útiles las utilidades en java.util.concurrent, pero me resulta difícil llamarlo "excelente" mientras todavía me obliga a pasar dos parámetros solo para especificar un tiempo de espera. ¿Hubiera matado a Doug para hacer una clase llamada Duration? – Trejkaz

17

OK, como otros señalan, lo mejor que se puede hacer es usar el paquete java.util.concurrent. Recomiendo "Java Concurrency in Practice". Es un gran libro que cubre casi todo lo que necesita saber.

En cuanto a su implementación particular, como noté en los comentarios, no inicie Threads from Constructors - puede no ser seguro.

Dejando eso de lado, la segunda implementación parece mejor. No desea poner colas en campos estáticos. Probablemente estés perdiendo flexibilidad por nada.

Si desea continuar con su propia implementación (¿para fines de aprendizaje, supongo?), Al menos suministre un método start(). Debería construir el objeto (puede crear una instancia del objeto Thread), y luego llamar al start() para iniciar el hilo.

Editar: ExecutorService tienen su propia cola, así que esto puede ser confuso. Aquí hay algo para empezar.

public class Main { 
    public static void main(String[] args) { 
     //The numbers are just silly tune parameters. Refer to the API. 
     //The important thing is, we are passing a bounded queue. 
     ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100)); 

     //No need to bound the queue for this executor. 
     //Use utility method instead of the complicated Constructor. 
     ExecutorService producer = Executors.newSingleThreadExecutor(); 

     Runnable produce = new Produce(consumer); 
     producer.submit(produce); 
    } 
} 

class Produce implements Runnable { 
    private final ExecutorService consumer; 

    public Produce(ExecutorService consumer) { 
     this.consumer = consumer; 
    } 

    @Override 
    public void run() { 
     Pancake cake = Pan.cook(); 
     Runnable consume = new Consume(cake); 
     consumer.submit(consume); 
    } 
} 

class Consume implements Runnable { 
    private final Pancake cake; 

    public Consume(Pancake cake){ 
     this.cake = cake; 
    } 

    @Override 
    public void run() { 
     cake.eat(); 
    } 
} 

hacer otras modificaciones: Para el productor, en lugar de while(true), se puede hacer algo como:

@Override 
public void run(){ 
    while(!Thread.currentThread().isInterrupted()){ 
     //do stuff 
    } 
} 

De esta manera usted puede apagar el ejecutor llamando .shutdownNow(). Si usa while(true), no se apagará.

También tenga en cuenta que el Producer sigue siendo vulnerable a RuntimeExceptions (es decir, uno RuntimeException detendrá el procesamiento)

+0

¿Debo agregar un método start() al consumidor y al productor? ¿Estás diciendo que debería poner algo como esto en mi método principal en vez? consumidor = nuevo Consumidor(); consumer.start (readQ); ¿o esto? consumer = new Comsumer (readQ); consumer.start(); – Gareth

+1

Normalmente haría un nuevo Comsumer (readQ); consumer.start() ;. En su caso, es aconsejable declarar que la cola es privada y, si lo hace, debe establecer la cola en el constructor. Si este es el código de producción, le recomiendo encarecidamente que vaya con la respuesta de Cletus. Si necesita absolutamente usar su cola interna, entonces debe usar ExecutorService executor = Executors.newSingleThreadExecutor() en lugar de un hilo sin formato en su lugar. Esto, entre otras cosas, lo protegerá de RuntimeException para detener su sistema. –

+0

Gracias. muy útil. Me he ido con BlockingQueue como lo sugirieron cletus en la cola interna. Todavía estoy tratando de entender la clase ExecutorService, pero cuando lo haga, definitivamente lo usaré. Gracias por tu ayuda. – Gareth

1
  1. código Java "BlockingQueue", que ha sincronizado PUT y GET método.
  2. Código Java "Productor", hilo productor para producir datos.
  3. código Java "Consumidor", hilo de consumo para consumir los datos producidos.
  4. Código de Java "ProducerConsumer_Main", función principal para iniciar el hilo productor y consumidor.

BlockingQueue.java

public class BlockingQueue 
{ 
    int item; 
    boolean available = false; 

    public synchronized void put(int value) 
    { 
     while (available == true) 
     { 
      try 
      { 
       wait(); 
      } catch (InterruptedException e) { 
      } 
     } 

     item = value; 
     available = true; 
     notifyAll(); 
    } 

    public synchronized int get() 
    { 
     while(available == false) 
     { 
      try 
      { 
       wait(); 
      } 
      catch(InterruptedException e){ 
      } 
     } 

     available = false; 
     notifyAll(); 
     return item; 
    } 
} 

Consumer.java

package com.sukanya.producer_Consumer; 

public class Consumer extends Thread 
{ 
    blockingQueue queue; 
    private int number; 
    Consumer(BlockingQueue queue,int number) 
    { 
     this.queue = queue; 
     this.number = number; 
    } 

    public void run() 
    { 
     int value = 0; 

     for (int i = 0; i < 10; i++) 
     { 
      value = queue.get(); 
      System.out.println("Consumer #" + this.number+ " got: " + value); 
     } 
    } 
} 

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer; 

public class ProducerConsumer_Main 
{ 
    public static void main(String args[]) 
    { 
     BlockingQueue queue = new BlockingQueue(); 
     Producer producer1 = new Producer(queue,1); 
     Consumer consumer1 = new Consumer(queue,1); 
     producer1.start(); 
     consumer1.start(); 
    } 
} 
+3

Los volcados de código sin explicación rara vez son útiles. – Chris

6

tengo respuesta cletus extendida propuesta al ejemplo de código de trabajo.

  1. One ExecutorService (pes) acepta Producer tasks.
  2. One ExecutorService (ces) acepta Consumer tareas.
  3. Ambos Producer y Consumer comparten BlockingQueue.
  4. Múltiples tareas Producer genera diferentes números.
  5. Cualquiera de Consumer tareas puede consumir número generado por Producer

Código:

import java.util.concurrent.*; 

public class ProducerConsumerWithES { 
    public static void main(String args[]){ 
     BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>(); 

     ExecutorService pes = Executors.newFixedThreadPool(2); 
     ExecutorService ces = Executors.newFixedThreadPool(2); 

     pes.submit(new Producer(sharedQueue,1)); 
     pes.submit(new Producer(sharedQueue,2)); 
     ces.submit(new Consumer(sharedQueue,1)); 
     ces.submit(new Consumer(sharedQueue,2)); 
     // shutdown should happen somewhere along with awaitTermination 
     /* https://stackoverflow.com/questions/36644043/how-to-properly-shutdown-java-executorservice/36644320#36644320 */ 
     pes.shutdown(); 
     ces.shutdown(); 
    } 
} 
class Producer implements Runnable { 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.threadNo = threadNo; 
     this.sharedQueue = sharedQueue; 
    } 
    @Override 
    public void run() { 
     for(int i=1; i<= 5; i++){ 
      try { 
       int number = i+(10*threadNo); 
       System.out.println("Produced:" + number + ":by thread:"+ threadNo); 
       sharedQueue.put(number); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

class Consumer implements Runnable{ 
    private final BlockingQueue<Integer> sharedQueue; 
    private int threadNo; 
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) { 
     this.sharedQueue = sharedQueue; 
     this.threadNo = threadNo; 
    } 
    @Override 
    public void run() { 
     while(true){ 
      try { 
       int num = sharedQueue.take(); 
       System.out.println("Consumed: "+ num + ":by thread:"+threadNo); 
      } catch (Exception err) { 
       err.printStackTrace(); 
      } 
     } 
    } 
} 

de salida:

Produced:11:by thread:1 
Produced:21:by thread:2 
Produced:22:by thread:2 
Consumed: 11:by thread:1 
Produced:12:by thread:1 
Consumed: 22:by thread:1 
Consumed: 21:by thread:2 
Produced:23:by thread:2 
Consumed: 12:by thread:1 
Produced:13:by thread:1 
Consumed: 23:by thread:2 
Produced:24:by thread:2 
Consumed: 13:by thread:1 
Produced:14:by thread:1 
Consumed: 24:by thread:2 
Produced:25:by thread:2 
Consumed: 14:by thread:1 
Produced:15:by thread:1 
Consumed: 25:by thread:2 
Consumed: 15:by thread:1 

Nota. Si no necesita múltiples Productores y Consumidores, mantenga solo Productor y Consumidor. He agregado varios productores y consumidores para mostrar las capacidades de BlockingQueue entre múltiples productores y consumidores.

+0

Esto no se ocupa de la condición de carrera cuando hay varios productores y consumidores. Cada uno ve la capacidad de ser 0 e intenta agregar. Con Single Producer y Single consumer no es necesario sincronizar en BlockingQueue, si es más de uno, se requiere Sincronizar. – Cleonjoys

+0

Puedes hacer una cosa, comentar a los consumidores, luego establecer el tamaño fijo para BlockingQueue, te verías a ti mismo. Probé tu código con el nuevo LinkedBlockingQueue (2); Entonces la producción fue como a continuación: Producción: 11: por Tema: 1 Producción: 21: por Tema: 2 Producción: 22: por Tema: 2 Producción: 12: por Tema: 1 ¿Cómo puede más los valores se insertan cuando la capacidad establecida de la cola era 2 – Cleonjoys

+0

Esa es la naturaleza de BlockingQueue. A menos que la capacidad esté disponible, será bloqueada. Estoy usando Cola de bloqueo ilimitada y el caso anterior no aparece. Incluso si surge debido a BlockingQueue limitado, es la forma en que Java lo implementó. Verifique https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#put-E-. El fragmento de código en mi publicación no tiene ningún problema. –

1

Este es un código muy simple.

import java.util.*; 

// @author : rootTraveller, June 2017 

class ProducerConsumer { 
    public static void main(String[] args) throws Exception { 
     Queue<Integer> queue = new LinkedList<>(); 
     Integer buffer = new Integer(10); //Important buffer or queue size, change as per need. 

     Producer producerThread = new Producer(queue, buffer, "PRODUCER"); 
     Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER"); 

     producerThread.start(); 
     consumerThread.start(); 
    } 
} 

class Producer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize ; 

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super(ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.size() == queueSize){ 
        System.out.println(Thread.currentThread().getName() + " FULL   : waiting...\n"); 
        try{ 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then produce one, add and notify 
       int randomInt = new Random().nextInt(); 
       System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
       queue.add(randomInt); 
       queue.notifyAll(); //Important 
      } //synchronized ends here : NOTE 
     } 
    } 
} 

class Consumer extends Thread { 
    private Queue<Integer> queue; 
    private int queueSize; 

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){ 
     super (ThreadName); 
     this.queue = queueIn; 
     this.queueSize = queueSizeIn; 
    } 

    public void run() { 
     while(true){ 
      synchronized (queue) { 
       while(queue.isEmpty()){ 
        System.out.println(Thread.currentThread().getName() + " Empty  : waiting...\n"); 
        try { 
         queue.wait(); //Important 
        } catch (Exception ex) { 
         ex.printStackTrace(); 
        } 
       } 

       //queue empty then consume one and notify 
       System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove()); 
       queue.notifyAll(); 
      } //synchronized ends here : NOTE 
     } 
    } 
}