Decir que tengo un AtomicReference
a una lista de objetos:AtomicReference a un objeto mutable y visibilidad
AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>());
Tema A añade elementos a esta lista: batch.get().add(o);
Más tarde, hilo B toma la lista y, por ejemplo, la almacena en un DB: insertBatch(batch.get());
¿Tengo que hacer una sincronización adicional al escribir (Thr ead A) y lectura (Hilo B) para asegurar que el hilo B vea la lista de la manera en que A lo dejó, o ¿esto es atendido por AtomicReference?
En otras palabras: si tengo una AtomicReference para un objeto mutable, y un hilo cambia ese objeto, ¿otros hilos ven este cambio inmediatamente?
Editar:
Tal vez un código de ejemplo está en orden:
public void process(Reader in) throws IOException {
List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>();
ExecutorService exec = Executors.newFixedThreadPool(4);
for (int i = 0; i < 4; ++i) {
tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() {
@Override public AtomicReference<List<Object>> call() throws IOException {
final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize));
Processor.this.parser.parse(in, new Parser.Handler() {
@Override public void onNewObject(Object event) {
batch.get().add(event);
if (batch.get().size() >= batchSize) {
dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
}
}
});
return batch;
}
}));
}
List<Object> remainingBatches = new ArrayList<Object>();
for (Future<AtomicReference<List<Object>>> task : tasks) {
try {
AtomicReference<List<Object>> remainingBatch = task.get();
remainingBatches.addAll(remainingBatch.get());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw (RuntimeException)cause;
}
}
// these haven't been flushed yet by the worker threads
if (!remainingBatches.isEmpty()) {
dao.insertBatch(remainingBatches);
}
}
Lo que sucede aquí es que puedo crear cuatro subprocesos de trabajo para analizar un texto (este es el parámetro Reader in
al método process()
) . Cada trabajador guarda las líneas que ha analizado en un lote y vacía el lote cuando está lleno (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));
).
Dado que el número de líneas en el texto no es un múltiplo del tamaño del lote, los últimos objetos terminan en un lote que no se vacía, ya que no está lleno. Por lo tanto, estos lotes restantes se insertan por el hilo principal.
Yo uso AtomicReference.getAndSet()
para reemplazar el lote completo por uno vacío. Es este programa correcto en cuanto a enhebrar?
De acuerdo, agregué un código de ejemplo a mi pregunta anterior. Yo uso 'AtomicReference.getAndSet()' para reemplazar un lote completo por uno nuevo y vacío. ¿Todavía necesito una sincronización adicional? –
Sí, su código es correcto, aunque el uso de 'AtomicReference' no parece ser necesario aquí. – Tudor
@Tudor estaba pensando exactamente lo mismo.De hecho, es posible que getAndSet() no haga lo que quiere porque obtendrá el valor actual y luego cambiará el valor de AtomicReference. –