2011-06-01 8 views
23

Tengo un programa que realiza muchos cálculos y los informa en un archivo con frecuencia. Sé que las operaciones de escritura frecuentes pueden retrasar mucho un programa, por lo que para evitarlo me gustaría tener un segundo hilo dedicado a las operaciones de escritura.¿Cuál es la mejor manera de escribir en un archivo en un hilo paralelo en Java?

En este momento estoy haciendo con esta clase que escribí (los impacientes pueden saltar hasta el final de la cuestión):

public class ParallelWriter implements Runnable { 

    private File file; 
    private BlockingQueue<Item> q; 
    private int indentation; 

    public ParallelWriter(File f){ 
     file = f; 
     q = new LinkedBlockingQueue<Item>(); 
     indentation = 0; 
    } 

    public ParallelWriter append(CharSequence str){ 
     try { 
      CharSeqItem item = new CharSeqItem(); 
      item.content = str; 
      item.type = ItemType.CHARSEQ; 
      q.put(item); 
      return this; 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 

    public ParallelWriter newLine(){ 
     try { 
      Item item = new Item(); 
      item.type = ItemType.NEWLINE; 
      q.put(item); 
      return this; 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 

    public void setIndent(int indentation) { 
     try{ 
      IndentCommand item = new IndentCommand(); 
      item.type = ItemType.INDENT; 
      item.indent = indentation; 
      q.put(item); 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 

    public void end(){ 
     try { 
      Item item = new Item(); 
      item.type = ItemType.POISON; 
      q.put(item); 
     } catch (InterruptedException ex) { 
      throw new RuntimeException(ex); 
     } 
    } 

    public void run() { 

     BufferedWriter out = null; 
     Item item = null; 

     try{ 
      out = new BufferedWriter(new FileWriter(file)); 
      while((item = q.take()).type != ItemType.POISON){ 
       switch(item.type){ 
        case NEWLINE: 
         out.newLine(); 
         for(int i = 0; i < indentation; i++) 
          out.append(" "); 
         break; 
        case INDENT: 
         indentation = ((IndentCommand)item).indent; 
         break; 
        case CHARSEQ: 
         out.append(((CharSeqItem)item).content); 
       } 
      } 
     } catch (InterruptedException ex){ 
      throw new RuntimeException(ex); 
     } catch (IOException ex) { 
      throw new RuntimeException(ex); 
     } finally { 
      if(out != null) try { 
       out.close(); 
      } catch (IOException ex) { 
       throw new RuntimeException(ex); 
      } 
     } 
    } 

    private enum ItemType { 
     CHARSEQ, NEWLINE, INDENT, POISON; 
    } 
    private static class Item { 
     ItemType type; 
    } 
    private static class CharSeqItem extends Item { 
     CharSequence content; 
    } 
    private static class IndentCommand extends Item { 
     int indent; 
    } 
} 

Y luego utilizarlo haciendo:

ParallelWriter w = new ParallelWriter(myFile); 
new Thread(w).start(); 

/// Lots of 
w.append(" things ").newLine(); 
w.setIndent(2); 
w.newLine().append(" more things "); 

/// and finally 
w.end(); 

Si bien esto funciona perfectamente bien, me pregunto: ¿Hay una mejor manera de lograr esto?

+0

Pregunta similar: http://stackoverflow.com/questions/8602466/can-multi-threads-write-data-into-a-file-at-the-same-time – Vadzim

Respuesta

14

Su enfoque básico se ve bien. Me gustaría estructurar el código de la siguiente manera:

import java.io.BufferedWriter; 
import java.io.File; 
import java.io.IOException; 
import java.io.Writer; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.TimeUnit; 

public interface FileWriter { 
    FileWriter append(CharSequence seq); 

    FileWriter indent(int indent); 

    void close(); 
} 

class AsyncFileWriter implements FileWriter, Runnable { 
    private final File file; 
    private final Writer out; 
    private final BlockingQueue<Item> queue = new LinkedBlockingQueue<Item>(); 
    private volatile boolean started = false; 
    private volatile boolean stopped = false; 

    public AsyncFileWriter(File file) throws IOException { 
     this.file = file; 
     this.out = new BufferedWriter(new java.io.FileWriter(file)); 
    } 

    public FileWriter append(CharSequence seq) { 
     if (!started) { 
      throw new IllegalStateException("open() call expected before append()"); 
     } 
     try { 
      queue.put(new CharSeqItem(seq)); 
     } catch (InterruptedException ignored) { 
     } 
     return this; 
    } 

    public FileWriter indent(int indent) { 
     if (!started) { 
      throw new IllegalStateException("open() call expected before append()"); 
     } 
     try { 
      queue.put(new IndentItem(indent)); 
     } catch (InterruptedException ignored) { 
     } 
     return this; 
    } 

    public void open() { 
     this.started = true; 
     new Thread(this).start(); 
    } 

    public void run() { 
     while (!stopped) { 
      try { 
       Item item = queue.poll(100, TimeUnit.MICROSECONDS); 
       if (item != null) { 
        try { 
         item.write(out); 
        } catch (IOException logme) { 
        } 
       } 
      } catch (InterruptedException e) { 
      } 
     } 
     try { 
      out.close(); 
     } catch (IOException ignore) { 
     } 
    } 

    public void close() { 
     this.stopped = true; 
    } 

    private static interface Item { 
     void write(Writer out) throws IOException; 
    } 

    private static class CharSeqItem implements Item { 
     private final CharSequence sequence; 

     public CharSeqItem(CharSequence sequence) { 
      this.sequence = sequence; 
     } 

     public void write(Writer out) throws IOException { 
      out.append(sequence); 
     } 
    } 

    private static class IndentItem implements Item { 
     private final int indent; 

     public IndentItem(int indent) { 
      this.indent = indent; 
     } 

     public void write(Writer out) throws IOException { 
      for (int i = 0; i < indent; i++) { 
       out.append(" "); 
      } 
     } 
    } 
} 

Si no quiere escribir en un hilo separado (? Tal vez en una prueba), puede tener una implementación de FileWriter que exige append en el Writer de la persona que llama hilo.

+0

Gracias, la delegación de tareas específicas de elementos a los artículos está más en línea con OOP que cómo lo estaba haciendo. Además, ¿hay una ventaja particular de usar 'this.stopped' para finalizar la lectura en comparación con un elemento de veneno? – trutheality

+0

También su operación de sangría hace algo ligeramente diferente: Mi sangría establece la sangría para todas las líneas futuras, la suya solo sangra en el lugar actual. – trutheality

+0

@trutheality Utilicé la variable 'stopped' porque es un modismo estándar detener un hilo cooperativamente. Además, puede usarlo para evitar que se llame a 'append' después de llamar a' end'. Entendí mal la funcionalidad de la operación de sangría en el código original publicado. –

6

Usar una LinkedBlockingQueue es una muy buena idea. No estoy seguro de que me guste parte del estilo del código ... pero el principio parece sólido.

Tal vez agregue una capacidad a LinkedBlockingQueue igual a un cierto% de su memoria total ... digamos 10,000 elementos ... de esta manera si su escritura va demasiado lenta, sus hilos de trabajo no seguirán agregando más trabajo hasta el montón está volado.

+0

Estoy de acuerdo con esta respuesta. – Joseph

+0

Tiene razón, agregar una capacidad es una buena idea. – trutheality

3

Una buena forma de intercambiar datos con un único hilo de consumidor es utilizar un Intercambiador.

Puede usar un StringBuilder o ByteBuffer como el búfer para intercambiar con el hilo de fondo. La latencia en la que se incurre puede ser de alrededor de 1 microsegundo, no implica la creación de ningún objeto y es menor con un BlockingQueue.

Del ejemplo que creo que vale la pena repetir aquí.

class FillAndEmpty { 
    Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>(); 
    DataBuffer initialEmptyBuffer = ... a made-up type 
    DataBuffer initialFullBuffer = ... 

    class FillingLoop implements Runnable { 
    public void run() { 
     DataBuffer currentBuffer = initialEmptyBuffer; 
     try { 
     while (currentBuffer != null) { 
      addToBuffer(currentBuffer); 
      if (currentBuffer.isFull()) 
      currentBuffer = exchanger.exchange(currentBuffer); 
     } 
     } catch (InterruptedException ex) { ... handle ... } 
    } 
    } 

    class EmptyingLoop implements Runnable { 
    public void run() { 
     DataBuffer currentBuffer = initialFullBuffer; 
     try { 
     while (currentBuffer != null) { 
      takeFromBuffer(currentBuffer); 
      if (currentBuffer.isEmpty()) 
      currentBuffer = exchanger.exchange(currentBuffer); 
     } 
     } catch (InterruptedException ex) { ... handle ...} 
    } 
    } 

    void start() { 
    new Thread(new FillingLoop()).start(); 
    new Thread(new EmptyingLoop()).start(); 
    } 
} 
+0

Gracias por la idea, ciertamente me enseñó algo nuevo. No sé si ese es el camino correcto en mi caso: realmente no quiero que el productor espere al consumidor, lo que parece ser necesario aquí. – trutheality

+0

@truheality, el productor solo esperará si el consumidor no puede mantener el ritmo. En este caso, tiene un problema que una cola puede ocultar. Una vez que una cola es demasiado larga, su rendimiento puede sufrir de formas impredecibles. –

1

sé que las operaciones de escritura frecuente puede ralentizar un programa de abajo mucho

Probablemente no tanto como usted piensa, siempre y cuando use el almacenamiento en búfer.

Cuestiones relacionadas