2012-03-03 14 views
5

Tengo una situación de un solo productor y un solo consumidor trabajando con una cola de objetos. Hay dos situaciones en las que la cola puede estar vacía:Java LinkedBlockingQueue con la capacidad de señalizar cuando haya terminado?

  1. El consumidor manejó los objetos más rápido de lo que el productor podía generar nuevos objetos (el productor usa E/S antes de generar objetos).
  2. El productor ha terminado la generación de objetos.

Si la cola está vacía, quiero que el consumidor espere hasta que haya un nuevo objeto disponible o hasta que el productor indique que ya está hecho.

Mi investigación hasta ahora no me ha llevado a ninguna parte porque todavía terminé con un bucle que comprueba tanto la cola como una marca booleana separada (isDone). Dado que no hay forma de esperar en bloqueos múltiples (pensando en esperar en la cola Y en la bandera), ¿qué se puede hacer para resolver esto?

+0

Agregué una solución propuesta a continuación si a alguien le importa revisarla. ¿Fue correcto agregarlo como respuesta? – Yon

Respuesta

2

En primer lugar, la sugerencia de que el uso de una envoltura es "demasiado trabajo" es una conjetura, y la OMI uno muy malo. Esta suposición debe ser medida con una prueba de rendimiento con los requisitos reales. Si y solo si la prueba falla, entonces verifique usando un generador de perfiles que el envoltorio del objeto de cola es el motivo.

Aún si haces eso y envolver el objeto queue (en este caso un String) realmente es la causa de un rendimiento inaceptable, puedes utilizar esta técnica: crear una cadena conocida y única para servir como un "final de mensajes" "mensaje"

public static final String NO_MORE_MESSAGES = UUID.randomUUID().toString(); 

Luego, cuando la recuperación de Cuerdas de la cola, sólo echa (que puede ser una verificación de referencia) si la cadena es NO_MORE_MESSAGES. Si es así, entonces has terminado de procesar.

+0

He respondido a @esaj sobre esto también. Supongamos que tenemos un String suficientemente único, ¿no sería una pérdida la sobrecarga adicional de ejecutar "iguales" en todas las cadenas (miles por cola, cientos de colas por minuto)? – Yon

+1

Observe la respuesta: dado que es un campo final estático, puede hacer una comprobación de igualdad de referencia (==, not .equals()) para la cadena especial. –

+0

¡Buen punto, me lo he perdido! – Yon

1

Simple. Define un objeto especial que el productor puede enviar para señalar "hecho".

+0

En la mayoría de los casos, la cola es de cadenas. Preferimos no ajustar el objeto String porque generará una sobrecarga adicional. – Yon

+0

Luego use un valor de cadena que no puede ocurrir en sus datos normales. ¿Qué tal una cadena de longitud cero? –

1

Una opción es envolver sus datos en un objeto de soporte, que puede usarse para señalar el final del procesamiento.

Por ejemplo:

public class QueueMessage { 
public MessageType type; 
public Object thingToWorkOn; 
} 

donde MessageType es una enumeración que define un mensaje de "trabajo" o un mensaje de "cierre".

+0

Esto es bastante sobrecargado, especialmente si es una cola de miles de cadenas, ¿no crees? – Yon

+0

@Yon: No realmente. –

1

Puede usar LinkedBlockingQueuespoll(long timeout, TimeUnit unit) -metodo en el consumidor, y si devuelve nulo (el tiempo de espera transcurrido), marque el indicador booleano. Otra manera sería pasar algún objeto especial "EndOfWork" en la cola como el último, para que el consumidor sepa que es el final del trabajo.

Sin embargo, otra forma sería interrumpir el hilo del consumidor de la cadena del productor, pero esto requeriría que el hilo productor tenga en cuenta al consumidor. Si ambos se implementaran como clases anidadas, podría usar la clase padre para mantener un valor de ejecución booleano, al que ambos podrían acceder, y terminar ambos hilos con un solo booleano.

+0

La encuesta con el temporizador es algo en lo que pensé, pero aún me pareció un desperdicio. El uso de un objeto especial es problemático aquí porque la cola es normalmente de cadenas y preferimos no envolverlos (provoca una sobrecarga adicional). – Yon

+0

¿Podría usar alguna cadena especial para denotar el final del trabajo y que el productor la escriba como la última cadena en cola? Algo corto que nunca aparecerá en el trabajo normal: cadenas, luego simplemente use equals() para ver si el trabajo-String es exactamente eso, y si es así, rescinda al consumidor. – esaj

+0

El contenido de las cadenas es altamente dinámico. Si quisiéramos crear una cadena de este tipo, probablemente sería bastante larga y usaría caracteres no estándar, lo que aumentaría la carga de la CPU tantas veces. – Yon

1

La siguiente opción se ha planteado también (no estoy seguro si esto debería estar en una respuesta a mí mismo, pero no pudo encontrar un lugar mejor para escribir esto):

crear un contenedor para la cola. Este envoltorio tendrá un monitor que el consumidor leerá cuando lo lea y el productor lo notificará cada vez que se agregue un objeto nuevo o se levante el indicador de isDone.

Cuando el consumidor lee objetos de la cola, estos objetos se envolverán con algo similar a lo que @ yann-ramin sugirió anteriormente. Sin embargo, para reducir la sobrecarga, el consumidor proporcionará una instancia única y reutilizable de QueueMessage en cada llamada leída (siempre será la misma instancia). El contenedor de cola actualizará los campos en consecuencia antes de devolver la instancia al consumidor.

Esto evita el uso de tiempos de espera, duerme, etc.

EDITADO Esta es una propuesta de ejecución:

/** 
* This work queue is designed to be used by ONE producer and ONE consumer 
* (no more, no less of neither). The work queue has certain added features, such 
* as the ability to signal that the workload generation is done and nothing will be 
* added to the queue. 
* 
* @param <E> 
*/ 
public class DefiniteWorkQueue<E> { 
    private final E[] EMPTY_E_ARRAY; 
    private LinkedBlockingQueue<E> underlyingQueue = new LinkedBlockingQueue<E>(); 
    private boolean isDone = false; 

    // This monitor allows for flagging when a change was done. 
    private Object changeMonitor = new Object(); 

    public DefiniteWorkQueue(Class<E> clazz) { 
     // Reuse this instance, makes calling toArray easier 
     EMPTY_E_ARRAY = (E[]) Array.newInstance(clazz, 0); 
    } 

    public boolean isDone() { 
     return isDone; 
    } 

    public void setIsDone() { 
     synchronized (changeMonitor) { 
      isDone = true; 
      changeMonitor.notifyAll(); 
     } 
    } 

    public int size() { 
     return underlyingQueue.size(); 
    } 

    public boolean isEmpty() { 
     return underlyingQueue.isEmpty(); 
    } 

    public boolean contains(E o) { 
     return underlyingQueue.contains(o); 
    } 

    public Iterator<E> iterator() { 
     return underlyingQueue.iterator(); 
    } 

    public E[] toArray() { 
     // The array we create is too small on purpose, the underlying 
     // queue will extend it as needed under a lock 
     return underlyingQueue.toArray(EMPTY_E_ARRAY); 
    } 

    public boolean add(E o) { 
     boolean retval; 
     synchronized (changeMonitor) { 
      retval = underlyingQueue.add(o); 
      if (retval) 
       changeMonitor.notifyAll(); 
     } 
     return retval; 
    } 

    public boolean addAll(Collection<? extends E> c) { 
     boolean retval; 
     synchronized (changeMonitor) { 
      retval = underlyingQueue.addAll(c); 
      if (retval) 
       changeMonitor.notifyAll(); 
     } 
     return retval; 
    } 

    public void remove(RemovalResponse<E> responseWrapper) throws InterruptedException { 
     synchronized (changeMonitor) { 
      // If there's nothing in the queue but it has not 
      // ended yet, wait for someone to add something. 
      if (isEmpty() && !isDone()) 
       changeMonitor.wait(); 

      // When we get here, we've been notified or 
      // the current underlying queue's state is already something 
      // we can respond about. 
      if (!isEmpty()) { 
       responseWrapper.type = ResponseType.ITEM; 
       responseWrapper.item = underlyingQueue.remove(); 
      } else if (isDone()) { 
       responseWrapper.type = ResponseType.IS_DONE; 
       responseWrapper.item = null; 
      } else { 
       // This should not happen 
       throw new IllegalStateException(
        "Unexpected state where a notification of change was  made but " + 
         "nothing is in the queue and work is not  done."); 
      } 
     } 
    } 

    public static class RemovalResponse<E> { 
     public enum ResponseType { 
      /** 
      * Used when the response contains the first item of the queue. 
      */ 
      ITEM, 

      /** 
      * Used when the work load is done and nothing new will arrive. 
      */ 
      IS_DONE 
     }; 

     private ResponseType type; 
     private E item; 

     public ResponseType getType() { 
      return type; 
     } 

     public void setType(ResponseType type) { 
      this.type = type; 
     } 

     public E getItem() { 
      return item; 
     } 

     public void setItem(E item) { 
      this.item = item; 
     } 

    } 
} 
Cuestiones relacionadas