2012-06-07 4 views
5

Tengo una aplicación multiproceso que tiene un subproceso productor y varios subprocesos de consumidor. Los datos se almacenan en una recopilación segura de subprocesos compartidos y se descargan a una base de datos cuando hay suficientes datos en el búfer.Almacenamiento en memoria intermedia de datos en la aplicación java multiproceso

De los javadocs -

BlockingQueue<E> 

una cola que, además, apoya las operaciones que aguardan a que la cola se convierta en no vacío al recuperar un elemento, y espera a que el espacio que estén disponibles en la cola al almacenar un elemento .

take() 

Recupera y elimina la cabeza de esta cola, a la espera de ser necesario hasta que un elemento esté disponible.

Mis preguntas -

  1. ¿Hay otra colección que tiene un método E [] tomar (int n)? es decir, la cola de bloqueo espera hasta que un elemento esté disponible. Lo que quiero es que deba esperar hasta que estén disponibles 100 o 200 elementos.
  2. Alternativamente, ¿hay otro método que podría utilizar para abordar el problema sin sondeo?
+0

caso de que los elementos se distribuirán por igual a cada consumidor, o de que las primera consumidor con el método de toma obtener los primeros 'elementos N', el segundo consumidor los próximos' elementos N', etc.? – SimonC

+0

¿Es esto realmente lo que quieres hacer?Esto puede introducir una latencia casi arbitrariamente grande entre los datos que se están produciendo y los que se descargan a la base de datos si la velocidad de producción se frena más allá de lo que terminas sintonizando. Si realmente necesita hacer este almacenamiento en búfer en toda su lógica, probablemente debería ser más como "Espere hasta que tenga N elementos o haya pasado X ms" – DRMacIver

+0

¿Por qué desea esperar? ¿Por qué no usar 'drain()'? Escribiría todos los datos que tiene disponibles hasta un máximo y preferiría no perder datos. –

Respuesta

2

Creo que la única manera es para extender alguna implementación de BlockingQueue o crear algún tipo de método de utilidad usando take:

public <E> void take(BlockingQueue<E> queue, List<E> to, int max) 
     throws InterruptedException { 

    for (int i = 0; i < max; i++) 
     to.add(queue.take()); 
} 
+0

En realidad, su enfoque es mucho mejor que el mío, suponiendo que solo haya un solo consumidor. – Zarkonnen

+1

Este enfoque no funciona bien con InterruptedException en absoluto porque pierde los elementos que se toman si se interrumpe. Realmente necesita agregar elementos a una colección pasada si no va a manejar la interrupción en sí misma, o capturar y devolver los elementos agotados hasta el momento si así lo desea. – DRMacIver

+0

Oh, buen punto +1 en el comentario. ¡Actualizado! – dacwe

1

No No estoy seguro de si hay una clase similar en la biblioteca estándar que tiene take(int n) método de tipo, pero usted debe ser capaz de envolver el defecto BlockingQueue añadir esa función sin demasiados problemas, no se ¿pensar?

Escenario alternativo sería desencadenar una acción en la que coloque elementos en la colección, donde un umbral establecido por usted desencadenaría la descarga.

2

El método drainTo no es exactamente lo que está buscando, pero ¿serviría para su propósito?

http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html#drainTo(java.util.Collection, int)

EDITAR

Se podría aplicar un lote poco más performante bloqueo takemin utilizando una combinación de take y drainTo:

public <E> void drainTo(final BlockingQueue<E> queue, final List<E> list, final int min) throws InterruptedException 
{ 
    int drained = 0; 
    do 
    { 
    if (queue.size() > 0) 
     drained += queue.drainTo(list, min - drained); 
    else 
    { 
     list.add(queue.take()); 
     drained++; 
    } 
    } 
    while (drained < min); 
} 
+1

como se menciona en otra respuesta (que ahora se elimina aparentemente), esto es todo lo que quiere hacer. – posdef

+0

He actualizado la respuesta para indicar que no resuelve la pregunta exacta. A veces, el OP no conoce soluciones alternativas, por lo que siempre vale la pena preguntar. – SimonC

+0

No puedo discutir contra eso :) – posdef

1

Así que esta debería ser una cola segura para la tarea que le permite bloquear la toma de un número arbitrario de elementos. Serán bienvenidos más ojos para verificar que el código de enhebrado sea correcto.

package mybq; 

import java.util.ArrayList; 
import java.util.LinkedList; 
import java.util.List; 

public class ChunkyBlockingQueue<T> { 
    protected final LinkedList<T> q = new LinkedList<T>(); 
    protected final Object lock = new Object(); 

    public void add(T t) { 
     synchronized (lock) { 
      q.add(t); 
      lock.notifyAll(); 
     } 
    } 

    public List<T> take(int numElements) { 
     synchronized (lock) { 
      while (q.size() < numElements) { 
       try { 
        lock.wait(); 
       } catch (InterruptedException e) { 
        Thread.currentThread().interrupt(); 
       } 
      } 
      ArrayList<T> l = new ArrayList<T>(numElements); 
      l.addAll(q.subList(0, numElements)); 
      q.subList(0, numElements).clear(); 
      return l; 
     } 
    } 
} 
+1

El 'notifyAll' en' add' es un poco derrochador aquí. Debería ser un simple 'notify' luego 'take' puede llamar a' notify' nuevamente si todavía quedan más elementos cuando está listo. – SimonC

Cuestiones relacionadas