2011-12-21 9 views
6

tiene problemas con la comunicación entre subprocesos y lo "resuelve" usando "mensajes ficticios" por todas partes. ¿Es una mala idea? ¿Cuáles son las posibles soluciones?Comunicación interproceso productor-consumidor

Ejemplo de problema que tengo.

El hilo principal inicia un hilo para procesar e insertar registros en la base de datos. hilo principal lee un archivo posiblemente enorme y coloca un registro (objeto) tras otro en una cola de bloqueo. el hilo de procesamiento lee de la cola y funciona.

¿Cómo digo "proceso de hilo" para detener? La cola puede estar vacía pero el trabajo no está terminado y el hilo principal no está ahora cuando el hilo de procesamiento ha terminado de funcionar y no puede interrumpirlo.

hilo para el procesamiento hace

while (queue.size() > 0 || !Thread.currentThread().isInterrupted()) { 
    MyObject object= queue.poll(100, TimeUnit.MILLISECONDS); 
    if (object != null) { 
     String data = object.getData(); 
     if (data.equals("END")) { 
      break; 
     } 
    // do work 
    } 
} 
// clean-up 
synchronized queue) { 
    queue.notifyAll(); 
} 
return; 

hilo principal y

// ...start processing thread... 
while(reader.hasNext(){ 
    // ...read whole file and put data in queue... 
} 
MyObject dummy = new MyObject(); 
dummy.setData("END"); 
queue.put(dummy); 
//Note: empty queue here means work is done 
while (queue.size() > 0) { 
    synchronized (queue) { 
     queue.wait(500); // over-cautios locking prevention i guess 
    } 
} 

Tenga en cuenta que la inserción esté en la misma transacción y transacción no puede ser manejado por hilo principal.

¿Cuál sería una mejor forma de hacerlo? (Estoy aprendiendo y no quiero comenzar a "hacerlo de la manera incorrecta")

Respuesta

4

Este mensaje ficticio es válido. Se llama "veneno". Algo que el productor envía al consumidor para que se detenga.

Otra posibilidad es llamar a Thread.interrupt() en algún lugar del hilo principal y atrapar y manejar la interruptedException en consecuencia, en el hilo del trabajador.

+0

Ok. ¿Cómo usaría esto si el tipo de objetos pasados ​​no tiene un campo String que pueda usarse fácilmente para esto? ¿Qué pasa si algún texto podría ser datos válidos y por lo tanto podría ser igual al veneno? Use un GUID? –

+1

Lo ideal sería cambiar la clase MyObject y agregar un método isPoison() allí. La forma en que este método determina si el objeto es un veneno o no depende de usted; puede ser algo simple como comparar si el mensaje es nulo, está vacío o si el mensaje fue creado explícitamente como veneno. Una idea sería refactorizar MyObject para que sea una interfaz con dos implementaciones, un MessageObject que siempre devuelve falso en el método isPoison() y un PoisonMessage, que siempre devuelve verdadero. Pero esto depende profundamente de qué es exactamente lo que está haciendo, y pueden existir mejores soluciones dado su contexto real. –

2

"lo resolvió" mediante el uso de "mensajes falsos" por todo el lugar. ¿Es esta una mala idea ? ¿Cuáles son las posibles soluciones?

No es una mala idea, se llama "Poison Pills" y es una forma razonable de detener un servicio basado en subprocesos.

Pero solo funciona cuando se conoce el número de productores y consumidores.

En el código que publicó, hay dos hilos, uno es "hilo principal", que produce datos, el otro es "hilo de procesamiento", que consume datos, las "píldoras venenosas" funcionan bien para esta circunstancia.

Pero imagínese, si también tiene otros productores, cómo sabe el consumidor cuándo detenerse (solo cuando todos los productores envíen "Poison Pills"), necesita saber exactamente el número de todos los productores, y verificar el número de "Poison Pills" en el consumidor, si es igual a la cantidad de productores, lo que significa que todos los productores dejaron de funcionar, el consumidor se detiene.

En "hilo principal", debe capturar InterruptedException, ya que si no, "hilo principal" podría no ser capaz de configurar la "píldora venenosa". Puede hacerlo, como a continuación,

... 
try { 
    // do normal processing 
} catch (InterruptedException e) { /* fall through */ } 
finally { 
    MyObject dummy = new MyObject(); 
    dummy.setData("END"); 
    ... 
} 
... 

También, se puede tratar de utilizar la ExecutorService a solucionar todos sus problemas.

(funciona cuando sólo se necesita hacer algunos trabajos y luego se detiene cuando todos están terminados)

void doWorks(Set<String> works, long timeout, TimeUnit unit) 
    throws InterruptedException { 
    ExecutorService exec = Executors.newCachedThreadPool(); 
    try { 
     for (final String work : works) 
      exec.execute(new Runnable() { 
        public void run() { 
         ... 
        } 
       }); 
    } finally { 
     exec.shutdown(); 
     exec.awaitTermination(timeout, unit); 
    } 
} 

estoy aprendiendo y no quiero empezar a "hacer el camino equivocado"

Es posible que tenga que leer el Libro: Concurrencia de Java en la práctica. Confía en mí, es lo mejor.

+0

Conozco el servicio del ejecutor pero la lectura de los datos y el procesamiento (validación) y la inserción no pueden estar en la misma tarea o método porque todos deberían ejecutarse en la misma transacción de la base de datos. Si entiendes lo que quiero decir –

+0

OK, entiendo eso, 'ExecutorService' es solo mi recomendación general, úsala o no según tus requisitos. Pero tenga cuidado de detectar todas las excepciones, si algunas excepciones no son detectadas, puede perder la oportunidad de establecer las "píldoras venenosas". –

0

Lo que podría hacer (lo que hice en un proyecto reciente) es envolver la cola y luego agregar un método 'isOpen()'.

class ClosableQ<T> { 

    boolean isOpen = true; 

    private LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); 

    public void put(T someObject) { 
     if (isOpen) { 
     lbq.put(someObject); 
     } 
    } 

    public T get() { 
     if (isOpen) { 
     return lbq.get(0); 
     } 
    } 

    public boolean isOpen() { 
     return isOpen; 
    } 

    public void open() { 
     isOpen = true; 
    } 

    public void close() { 
     isOpen = false; 
    } 
} 

Así que su subproceso de escritura se convierte en algo así como:

while (reader.hasNext()) { 
    // read the file and put it into the queue 
    dataQ.put(someObject); 
} 
// now we're done 
dataQ.close(); 

y el hilo lector:

while (dataQ.isOpen) { 
    someObject = dataQ.get(); 
} 

Se podría, por supuesto, ampliar la lista en su lugar, pero que le da al usuario un nivel de acceso que quizás no desees Y necesita agregar algunas características de concurrencia a este código, como AtomicBoolean.

+0

Estoy pasando la cola a un método. Leer tu idea me hizo tener lo siguiente. El método también podría aceptar un mantenimiento de AtomicBoolean. Entonces puede ejecutarse siempre que queue.size> 0 || keepProcessing.get() –