2012-02-21 12 views
7

Decir que tengo un AtomicReference a una lista de objetos:AtomicReference a un objeto mutable y visibilidad

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>()); 

Tema A añade elementos a esta lista: batch.get().add(o);

Más tarde, hilo B toma la lista y, por ejemplo, la almacena en un DB: insertBatch(batch.get());

¿Tengo que hacer una sincronización adicional al escribir (Thr ead A) y lectura (Hilo B) para asegurar que el hilo B vea la lista de la manera en que A lo dejó, o ¿esto es atendido por AtomicReference?

En otras palabras: si tengo una AtomicReference para un objeto mutable, y un hilo cambia ese objeto, ¿otros hilos ven este cambio inmediatamente?

Editar:

Tal vez un código de ejemplo está en orden:

public void process(Reader in) throws IOException { 
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(4); 

    for (int i = 0; i < 4; ++i) { 
     tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() { 
      @Override public AtomicReference<List<Object>> call() throws IOException { 

       final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize)); 

       Processor.this.parser.parse(in, new Parser.Handler() { 
        @Override public void onNewObject(Object event) { 
          batch.get().add(event); 

          if (batch.get().size() >= batchSize) { 
           dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); 
          } 
        } 
       }); 

       return batch; 
      } 
     })); 
    } 

    List<Object> remainingBatches = new ArrayList<Object>(); 

    for (Future<AtomicReference<List<Object>>> task : tasks) { 
     try { 
      AtomicReference<List<Object>> remainingBatch = task.get(); 
      remainingBatches.addAll(remainingBatch.get()); 
     } catch (ExecutionException e) { 
      Throwable cause = e.getCause(); 

      if (cause instanceof IOException) { 
       throw (IOException)cause; 
      } 

      throw (RuntimeException)cause; 
     } 
    } 

    // these haven't been flushed yet by the worker threads 
    if (!remainingBatches.isEmpty()) { 
     dao.insertBatch(remainingBatches); 
    } 
} 

Lo que sucede aquí es que puedo crear cuatro subprocesos de trabajo para analizar un texto (este es el parámetro Reader in al método process()) . Cada trabajador guarda las líneas que ha analizado en un lote y vacía el lote cuando está lleno (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));).

Dado que el número de líneas en el texto no es un múltiplo del tamaño del lote, los últimos objetos terminan en un lote que no se vacía, ya que no está lleno. Por lo tanto, estos lotes restantes se insertan por el hilo principal.

Yo uso AtomicReference.getAndSet() para reemplazar el lote completo por uno vacío. Es este programa correcto en cuanto a enhebrar?

Respuesta

9

Um ... realmente no funciona así. AtomicReference garantiza que la referencia en sí misma sea visible en todos los hilos, es decir, si le asigna una referencia diferente a la original, la actualización será visible. No garantiza el contenido real del objeto al que apunta la referencia.

Por lo tanto, las operaciones de lectura/escritura en el contenido de la lista requieren una sincronización por separado.

Editar: A juzgar por el código actualizado y el comentario que ha publicado, establecer la referencia local en volatile es suficiente para garantizar la visibilidad.

+0

De acuerdo, agregué un código de ejemplo a mi pregunta anterior. Yo uso 'AtomicReference.getAndSet()' para reemplazar un lote completo por uno nuevo y vacío. ¿Todavía necesito una sincronización adicional? –

+1

Sí, su código es correcto, aunque el uso de 'AtomicReference' no parece ser necesario aquí. – Tudor

+0

@Tudor estaba pensando exactamente lo mismo.De hecho, es posible que getAndSet() no haga lo que quiere porque obtendrá el valor actual y luego cambiará el valor de AtomicReference. –

0

Agregando a Tudor 's Respuesta: se tiene que hacer la misma ArrayList multi-hilo o - en función de sus necesidades - incluso los bloques de código más grandes.

Si usted puede conseguir lejos con un multi-hilo ArrayList puede "decorar" de esta manera:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>()); 

Pero hay que tener en cuenta: A pesar de "simple" construcciones de este tipo son no multi-hilo con esto:

Object o = batch.get(batch.size()-1); 
0

El AtomicReference solo le ayudará con la referencia a la lista, no hará nada en la lista. Más particularmente, en su escenario, es casi seguro que se encuentre con problemas cuando el sistema está bajo carga, donde el consumidor ha tomado la lista mientras el productor le agrega un artículo.

Me parece que debería estar utilizando un BlockingQueue. A continuación, puede Limitar la huella de memoria si su productor es más rápido que su consumidor y dejar que la cola maneje toda la contención.

algo como:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50); 

// ... Producer 
queue.put(o); 

// ... Consumer 
List<Object> queueContents = new ArrayList<Object>(); 
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items. 
queue.drainTo(queueContents); 

Agregado

Gracias a @Tudor para señalar la arquitectura que está utilizando. ... Tengo que admitir que es bastante extraño. Realmente no necesita AtomicReference en absoluto hasta donde yo pueda ver. Cada hilo posee su propio ArrayList hasta que se transfiere a dao, en cuyo punto se reemplaza para que no exista ninguna contención en ningún lado.

Estoy un poco preocupado por usted la creación de cuatro analizador en un solo Reader. Espero que tengas alguna forma de garantizar que cada analizador no afecte a los demás.

Yo personalmente usaría algún tipo de patrón productor-consumidor como lo he descrito en el código anterior. Algo como esto tal vez.

static final int PROCESSES = 4; 
static final int batchSize = 10; 

public void process(Reader in) throws IOException, InterruptedException { 

    final List<Future<Void>> tasks = new ArrayList<Future<Void>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(PROCESSES); 
    // Queue of objects. 
    final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2); 
    // The final object to post. 
    final Object FINISHED = new Object(); 

    // Start the producers. 
    for (int i = 0; i < PROCESSES; i++) { 
    tasks.add(exec.submit(new Callable<Void>() { 
     @Override 
     public Void call() throws IOException { 

     Processor.this.parser.parse(in, new Parser.Handler() { 
      @Override 
      public void onNewObject(Object event) { 
      queue.add(event); 
      } 
     }); 
     // Post a finished down the queue. 
     queue.add(FINISHED); 
     return null; 
     } 
    })); 
    } 

    // Start the consumer. 
    tasks.add(exec.submit(new Callable<Void>() { 
    @Override 
    public Void call() throws IOException { 
     List<Object> batch = new ArrayList<Object>(batchSize); 
     int finishedCount = 0; 
     // Until all threads finished. 
     while (finishedCount < PROCESSES) { 
     Object o = queue.take(); 
     if (o != FINISHED) { 
      // Batch them up. 
      batch.add(o); 
      if (batch.size() >= batchSize) { 
      dao.insertBatch(batch); 
      // If insertBatch takes a copy we could merely clear it. 
      batch = new ArrayList<Object>(batchSize); 
      } 
     } else { 
      // Count the finishes. 
      finishedCount += 1; 
     } 
     } 
     // Finished! Post any incopmplete batch. 
     if (batch.size() > 0) { 
     dao.insertBatch(batch); 
     } 
     return null; 
    } 
    })); 

    // Wait for everything to finish. 
    exec.shutdown(); 
    // Wait until all is done. 
    boolean finished = false; 
    do { 
    try { 
     // Wait up to 1 second for termination. 
     finished = exec.awaitTermination(1, TimeUnit.SECONDS); 
    } catch (InterruptedException ex) { 
    } 
    } while (!finished); 
} 
+0

Umm ... de su código no se ve como productor-consumidor. De hecho, está generando un equipo de hilos, cada uno trabajando un poco, y luego se une a ellos y termina el trabajo en el hilo principal. No hay datos reales que se pasan entre los hilos. – Tudor

1

creo que, olvidándose de todo el código aquí, se pregunta exacta es la siguiente:

¿Tengo que hacer una sincronización adicional al escribir (Tema A) y lectura (Tema B) a Asegúrese de que el hilo B vea la lista como lo dejó A, o ¿AtomicReference lo resuelve?

Por lo tanto, la respuesta exacta a saber: SÍ , atómica cuidar de visibilidad. Y no es mi opinión sino la JDK documentation one:

Los efectos de memoria para los accesos y las actualizaciones de atomics generalmente siguen las reglas para volátiles, como se establece en la Especificación del lenguaje Java, tercera edición (Modelo de memoria 17.4).

Espero que esto ayude.

Cuestiones relacionadas