2012-06-18 49 views
23

Considere unas pocas instancias de servidores web que se ejecutan en paralelo. Cada servidor contiene una referencia a un único "guardián de estado" compartido, cuya función es mantener las últimas solicitudes N de todos los servidores.Buffer circular Thread-safe en Java

Por ejemplo (N=3):

Server a: "Request id = ABCD"  Status keeper=["ABCD"] 
Server b: "Request id = XYZZ"  Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"  Status keeper=["ABCD", "XYZZ", "1234"] 
Server b: "Request id = FOO"   Status keeper=["XYZZ", "1234", "FOO"] 
Server a: "Request id = BAR"   Status keeper=["1234", "FOO", "BAR"] 

En cualquier punto en el tiempo, el "guardián de estado" puede ser llamado desde una aplicación de monitoreo que lee estos últimos N solicitudes de un informe de SLA.

¿Cuál es la mejor manera de implementar este escenario productor-consumidor en Java, dando a los servidores web una mayor prioridad que el informe SLA?

CircularFifoBuffer parece ser la estructura de datos adecuada para contener las solicitudes, pero no estoy seguro de cuál es la forma óptima de implementar concurrencia eficiente.

+0

Define "prioridad más alta". ¿Qué sucede si el informe ha comenzado a leer el búfer? ¿Debería romperse y volver a empezar si alguien quiere escribirlo? ¿Eso a su vez puede conducir al hambre? –

+0

Nunca debería morir de hambre y nunca debería detenerse, pero puede esperar un poco más, lo que significa que su prioridad debería aumentar lentamente con el tiempo. –

+0

Cuantos productores y cuántos consumidores debe tener el buffer en anillo, voy a soltar algunos códigos cuando proporcione datos. – bestsss

Respuesta

16
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer()); 
+0

+1 ¿No debería 'fifo' ser volátil? –

+2

En realidad, no importa, siempre que el código de inicialización no sea posible de rastrear – MahdeTo

+0

¿De dónde viene BufferUtils? Intenté usar esto desde Apache, en el archivo gradle: "compilar" org.apache.commons: commons-collections4: 4.1 '", pero no está allí ... –

2

Me gustaría echar un vistazo a ArrayDeque, o para una implementación más concurrente echar un vistazo a la biblioteca Disruptor que es uno de los búfers de anillo más sofisticados/complejos en Java.

Una alternativa es usar una cola ilimitada que sea más concurrente ya que el productor nunca necesita esperar al consumidor. Java Chronicle

A menos que sus necesidades justifiquen la complejidad, un ArrayDeque puede ser todo lo que necesita.

+0

Un problema importante: 'ArrayDeque' no tiene un tamaño limitado. Utiliza una matriz circular, cierto, pero cambiará de tamaño para acomodar más elementos según sea necesario. El OP tendría que 'pop()' manualmente un elemento antes de insertar uno nuevo después de un tiempo, y al mismo tiempo mantener explícitamente la seguridad del hilo ... – thkala

+1

Si necesita que sea de tamaño limitado, puede usar ArrayBlockingQueue. –

+1

'ArrayBlockingQueue' limita su tamaño mediante el bloqueo hasta que se elimine un elemento. Por lo que puedo decir, el OP quiere que la cola implícitamente coloque/sobreescriba el elemento más antiguo, solo manteniendo los últimos elementos 'N'. – thkala

7

Aquí hay una implementación de búfer de anillo sin bloqueo. Implementa un buffer de tamaño fijo; no hay funcionalidad FIFO. Sugeriría que almacene un Collection de solicitudes para cada servidor en su lugar. De esta forma, su informe puede filtrar en lugar de filtrar su estructura de datos.

/** 
* Container 
* --------- 
* 
* A lock-free container that offers a close-to O(1) add/remove performance. 
* 
*/ 
public class Container<T> implements Iterable<T> { 

    // The capacity of the container. 
    final int capacity; 
    // The list. 
    AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); 
    // TESTING { 
    AtomicLong totalAdded = new AtomicLong(0); 
    AtomicLong totalFreed = new AtomicLong(0); 
    AtomicLong totalSkipped = new AtomicLong(0); 

    private void resetStats() { 
    totalAdded.set(0); 
    totalFreed.set(0); 
    totalSkipped.set(0); 
    } 
    // TESTING } 

    // Constructor 
    public Container(int capacity) { 
    this.capacity = capacity; 
    // Construct the list. 
    Node<T> h = new Node<T>(); 
    Node<T> it = h; 
    // One created, now add (capacity - 1) more 
    for (int i = 0; i < capacity - 1; i++) { 
     // Add it. 
     it.next = new Node<T>(); 
     // Step on to it. 
     it = it.next; 
    } 
    // Make it a ring. 
    it.next = h; 
    // Install it. 
    head.set(h); 
    } 

    // Empty ... NOT thread safe. 
    public void clear() { 
    Node<T> it = head.get(); 
    for (int i = 0; i < capacity; i++) { 
     // Trash the element 
     it.element = null; 
     // Mark it free. 
     it.free.set(true); 
     it = it.next; 
    } 
    // Clear stats. 
    resetStats(); 
    } 

    // Add a new one. 
    public Node<T> add(T element) { 
    // Get a free node and attach the element. 
    totalAdded.incrementAndGet(); 
    return getFree().attach(element); 
    } 

    // Find the next free element and mark it not free. 
    private Node<T> getFree() { 
    Node<T> freeNode = head.get(); 
    int skipped = 0; 
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free. 
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { 
     skipped += 1; 
     freeNode = freeNode.next; 
    } 
    // Keep count of skipped. 
    totalSkipped.addAndGet(skipped); 
    if (skipped < capacity) { 
     // Put the head as next. 
     // Doesn't matter if it fails. That would just mean someone else was doing the same. 
     head.set(freeNode.next); 
    } else { 
     // We hit the end! No more free nodes. 
     throw new IllegalStateException("Capacity exhausted."); 
    } 
    return freeNode; 
    } 

    // Mark it free. 
    public void remove(Node<T> it, T element) { 
    totalFreed.incrementAndGet(); 
    // Remove the element first. 
    it.detach(element); 
    // Mark it as free. 
    if (!it.free.compareAndSet(false, true)) { 
     throw new IllegalStateException("Freeing a freed node."); 
    } 
    } 

    // The Node class. It is static so needs the <T> repeated. 
    public static class Node<T> { 

    // The element in the node. 
    private T element; 
    // Are we free? 
    private AtomicBoolean free = new AtomicBoolean(true); 
    // The next reference in whatever list I am in. 
    private Node<T> next; 

    // Construct a node of the list 
    private Node() { 
     // Start empty. 
     element = null; 
    } 

    // Attach the element. 
    public Node<T> attach(T element) { 
     // Sanity check. 
     if (this.element == null) { 
     this.element = element; 
     } else { 
     throw new IllegalArgumentException("There is already an element attached."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    // Detach the element. 
    public Node<T> detach(T element) { 
     // Sanity check. 
     if (this.element == element) { 
     this.element = null; 
     } else { 
     throw new IllegalArgumentException("Removal of wrong element."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    public T get() { 
     return element; 
    } 

    @Override 
    public String toString() { 
     return element != null ? element.toString() : "null"; 
    } 
    } 

    // Provides an iterator across all items in the container. 
    public Iterator<T> iterator() { 
    return new UsedNodesIterator<T>(this); 
    } 

    // Iterates across used nodes. 
    private static class UsedNodesIterator<T> implements Iterator<T> { 
    // Where next to look for the next used node. 

    Node<T> it; 
    int limit = 0; 
    T next = null; 

    public UsedNodesIterator(Container<T> c) { 
     // Snapshot the head node at this time. 
     it = c.head.get(); 
     limit = c.capacity; 
    } 

    public boolean hasNext() { 
     // Made into a `while` loop to fix issue reported by @Nim in code review 
     while (next == null && limit > 0) { 
     // Scan to the next non-free node. 
     while (limit > 0 && it.free.get() == true) { 
      it = it.next; 
      // Step down 1. 
      limit -= 1; 
     } 
     if (limit != 0) { 
      next = it.element; 
     } 
     } 
     return next != null; 
    } 

    public T next() { 
     T n = null; 
     if (hasNext()) { 
     // Give it to them. 
     n = next; 
     next = null; 
     // Step forward. 
     it = it.next; 
     limit -= 1; 
     } else { 
     // Not there!! 
     throw new NoSuchElementException(); 
     } 
     return n; 
    } 

    public void remove() { 
     throw new UnsupportedOperationException("Not supported."); 
    } 
    } 

    @Override 
    public String toString() { 
    StringBuilder s = new StringBuilder(); 
    Separator comma = new Separator(","); 
    // Keep counts too. 
    int usedCount = 0; 
    int freeCount = 0; 
    // I will iterate the list myself as I want to count free nodes too. 
    Node<T> it = head.get(); 
    int count = 0; 
    s.append("["); 
    // Scan to the end. 
    while (count < capacity) { 
     // Is it in-use? 
     if (it.free.get() == false) { 
     // Grab its element. 
     T e = it.element; 
     // Is it null? 
     if (e != null) { 
      // Good element. 
      s.append(comma.sep()).append(e.toString()); 
      // Count them. 
      usedCount += 1; 
     } else { 
      // Probably became free while I was traversing. 
      // Because the element is detached before the entry is marked free. 
      freeCount += 1; 
     } 
     } else { 
     // Free one. 
     freeCount += 1; 
     } 
     // Next 
     it = it.next; 
     count += 1; 
    } 
    // Decorate with counts "]used+free". 
    s.append("]").append(usedCount).append("+").append(freeCount); 
    if (usedCount + freeCount != capacity) { 
     // Perhaps something was added/freed while we were iterating. 
     s.append("?"); 
    } 
    return s.toString(); 
    } 
} 

Tenga en cuenta que esto es cerca de O1 poner y obtener. Un Separator simplemente emite "" la primera vez y luego su parámetro a partir de ese momento.

Editar: Se agregaron métodos de prueba.

// ***** Following only needed for testing. ***** 
private static boolean Debug = false; 
private final static String logName = "Container.log"; 
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\"); 

private static synchronized void log(boolean toStdoutToo, String s) { 
    if (Debug) { 
    if (toStdoutToo) { 
     System.out.println(s); 
    } 
    log(s); 
    } 
} 

private static synchronized void log(String s) { 
    if (Debug) { 
    try { 
     log.writeLn(logName, s); 
    } catch (IOException ex) { 
     ex.printStackTrace(); 
    } 
    } 
} 
static volatile boolean testing = true; 

// Tester object to exercise the container. 
static class Tester<T> implements Runnable { 
    // My name. 

    T me; 
    // The container I am testing. 
    Container<T> c; 

    public Tester(Container<T> container, T name) { 
    c = container; 
    me = name; 
    } 

    private void pause() { 
    try { 
     Thread.sleep(0); 
    } catch (InterruptedException ex) { 
     testing = false; 
    } 
    } 

    public void run() { 
    // Spin on add/remove until stopped. 
    while (testing) { 
     // Add it. 
     Node<T> n = c.add(me); 
     log("Added " + me + ": " + c.toString()); 
     pause(); 
     // Remove it. 
     c.remove(n, me); 
     log("Removed " + me + ": " + c.toString()); 
     pause(); 
    } 
    } 
} 
static final String[] strings = { 
    "One", "Two", "Three", "Four", "Five", 
    "Six", "Seven", "Eight", "Nine", "Ten" 
}; 
static final int TEST_THREADS = Math.min(10, strings.length); 

public static void main(String[] args) throws InterruptedException { 
    Debug = true; 
    log.delete(logName); 
    Container<String> c = new Container<String>(10); 

    // Simple add/remove 
    log(true, "Simple test"); 
    Node<String> it = c.add(strings[0]); 
    log("Added " + c.toString()); 
    c.remove(it, strings[0]); 
    log("Removed " + c.toString()); 

    // Capacity test. 
    log(true, "Capacity test"); 
    ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length); 
    // Fill it. 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    // Add one more. 
    try { 
    c.add("Wafer thin mint!"); 
    } catch (IllegalStateException ise) { 
    log("Full!"); 
    } 
    c.clear(); 
    log("Empty: " + c.toString()); 

    // Iterate test. 
    log(true, "Iterator test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    } 
    StringBuilder all = new StringBuilder(); 
    Separator sep = new Separator(","); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("All: "+all); 
    for (int i = 0; i < strings.length; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    } 
    sep.reset(); 
    all.setLength(0); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("None: " + all.toString()); 

    // Multiple add/remove 
    log(true, "Multi test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    log("Filled " + c.toString()); 
    for (int i = 0; i < strings.length - 1; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    log("Removed " + strings[i] + " " + c.toString()); 
    } 
    c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]); 
    log("Empty " + c.toString()); 

    // Multi-threaded add/remove 
    log(true, "Threads test"); 
    c.clear(); 
    for (int i = 0; i < TEST_THREADS; i++) { 
    Thread t = new Thread(new Tester<String>(c, strings[i])); 
    t.setName("Tester " + strings[i]); 
    log("Starting " + t.getName()); 
    t.start(); 
    } 
    // Wait for 10 seconds. 
    long stop = System.currentTimeMillis() + 10 * 1000; 
    while (System.currentTimeMillis() < stop) { 
    Thread.sleep(100); 
    } 
    // Stop the testers. 
    testing = false; 
    // Wait some more. 
    Thread.sleep(1 * 100); 
    // Get stats. 
    double added = c.totalAdded.doubleValue(); 
    double skipped = c.totalSkipped.doubleValue(); 
    //double freed = c.freed.doubleValue(); 
    log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped)/added) + ")"); 
} 
+0

¿Tiene alguna verificación formal de la corrección de este algoritmo? Las estructuras de datos sin bloqueo son muy difíciles de corregir, a menos que evite reutilizar los nodos ... – thkala

+0

@thkala: ¿qué tan "formal" necesita? El algoritmo principal está en el método 'getFree' que selecciona un nodo libre y lo marca para su uso. Es bastante simple y su corrección debe ser evidente. He agregado mis métodos de prueba. Quizás ellos ayuden. – OldCurmudgeon

+0

El tipo de algoritmos 'formales' publicados y revisados ​​por pares.He trabajado extensamente con estructuras de datos sin bloqueo y puede ser extremadamente difícil hacerlo bien. Hay demasiados casos de esquina ... – thkala

1

también echar un vistazo a java.util.concurrent.

colas de bloqueo se bloqueará hasta que hay algo de consumir o espacio (opcionalmente) para producir:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

concurrente cola enlazada es no bloqueante y utiliza un algoritmo de mancha que permite a un productor y el consumidor a activar de forma simultánea:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

+0

CLQ está desatado, no funciona. – bestsss

1

de Hazelcast Queue ofrece casi todo lo que pida, pero no apo t circularidad. Pero según su descripción, no estoy seguro si realmente lo necesita.

0

Si fuera yo, usaría el CircularFIFOBuffer como lo indicó, y sincronizaré alrededor del búfer al escribir (agregar). Cuando la aplicación de monitoreo quiere leer el búfer, sincronícelo en el búfer y luego cópielo o clónelo para usarlo en el informe.

Esta sugerencia se basa en la suposición de que la latencia es mínima para copiar/clonar el búfer a un nuevo objeto. Si hay una gran cantidad de elementos y el tiempo de copiado es lento, entonces esta no es una buena idea.

ejemplo pseudo-código:

public void writeRequest(String requestID) { 
    synchronized(buffer) { 
     buffer.add(requestID); 
    } 
} 

public Collection<String> getRequests() { 
    synchronized(buffer) { 
     return buffer.clone(); 
    } 
} 
2

Tal vez usted quiere mirar Disruptor - Concurrent Programming Framework.

  • Buscar un artículo que describe las alternativas de diseño y también un comparement rendimiento a java.util.concurrent.ArrayBlockingQueue aquí: pdf
  • considerar para leer los tres primeros artículos de BlogsAndArticles

Si la biblioteca es demasiado, palo a java.util.concurrent.ArrayBlockingQueue