2010-04-26 7 views
6

He estado tratando de usar Hadoop para enviar N cantidad de líneas a una sola asignación. No necesito que las líneas se dividan ya.Varias líneas de texto en un solo mapa

He intentado utilizar NLineInputFormat, sin embargo, eso envía N líneas de texto de los datos a cada mapeador una línea a la vez [renunciar después de la línea enésima].

me han tratado de establecer la opción y sólo toma N líneas de entrada de enviarlo a 1 línea a la vez a cada mapa:

job.setInt("mapred.line.input.format.linespermap", 10); 

He encontrado una lista de correo recomendarme para anular LineRecordReader :: siguiente, sin embargo, eso no es tan simple, ya que los miembros de datos internos son todos privados.

Acabo de comprobar la fuente de NLineInputFormat y codifica Hard LineReader, por lo que anular no ayudará.

Además, por cierto estoy usando Hadoop 0.18 para compatibilidad con Amazon EC2 MapReduce.

+0

Por qué estás tratando de hacer esto? ¿Las líneas múltiples constituyen un solo registro en algún sentido? –

+0

Realmente necesito un número N de líneas aleatorias [como un conjunto], sin embargo, puedo vivir con un consequative. Necesito que lo envíe al reductor correcto. – monksy

+0

Para responder a su pregunta, sí lo hacen. – monksy

Respuesta

7

Tiene que implementar su propio formato de entrada. También tiene la posibilidad de definir su propio lector de registro.

Desafortunadamente, tiene que definir un método getSplits(). En mi opinión, esto será más difícil que implementar el lector de registros: este método tiene que implementar una lógica para dividir los datos de entrada.

Véase el siguiente extracto de "Hadoop - La guía definitiva" (un gran libro que siempre recomiendo!):

Esta es la unión:

public interface InputFormat<K, V> { 
    InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; 
    RecordReader<K, V> getRecordReader(InputSplit split, 
            JobConf job, 
            Reporter reporter) throws IOException; 
} 

El JobClient llama a los getSplits (método) , pasando el número deseado de tareas de mapa como argumento numSplits. Este número se trata como una pista, ya que las menciones InputFormat imple- son libres de devolver un número diferente de divisiones al número especificado en numSplits. Una vez calculadas las divisiones, el cliente las envía al rastreador de trabajos, que usa sus ubicaciones de almacenamiento para programar tareas de mapas y procesarlas en los rastreadores de tareas.

En un rastreador de tareas, la tarea del mapa pasa la división al método getRecordReader() en InputFormat para obtener un RecordReader para esa división. Un RecordReader es poco más que iterador sobre registros, y la tarea de mapa usa uno para generar pares de clave-valor de registro, que pasa a la función de mapa. Un fragmento de código (basado en el código de MapRunner) ilustra la idea:

K key = reader.createKey(); 
V value = reader.createValue(); 
while (reader.next(key, value)) { 
    mapper.map(key, value, output, reporter); 
} 
+0

Eso funciona. Pero eso realmente no responde la pregunta. Existe un problema con la adición de nuevos InputFormats en 18.3. – monksy

+2

Ok, lo siento. De hecho, no hay una pregunta real, ya que no veo signos de interrogación :-P ¿Qué más necesita saber más específico? –

1

Creo que en su caso se puede seguir el patrón de delegación y aplicar una envoltura alrededor de LineRecordReader que anula los métodos necesarios, es decir, al lado() (o nextKeyValue() en la API nueva) para establecer el valor de una concatenación de N líneas, en lugar de una línea.

He buscado en Google la implementación ejemplar de ParagraphRecordReader que usa LineRecordReader para leer los datos de entrada línea por línea (y concatenarlos) hasta que encuentre EOF o una línea en blanco. Luego devuelve un par, donde el valor es un párrafo (en lugar de una línea). Además, ParagraphInputFormat para este ParagraphRecordReader es tan simple como TextInputFormat estándar.

Puede encontrar los enlaces necesarios para esta implementación y un par de palabras al respecto en la siguiente publicación: http://hadoop-mapreduce.blogspot.com/2011/03/little-more-complicated-recordreaders.html.

Mejor

2

He resuelto este problema recientemente por la simple creación de mi propia InputFormat que anula NLineInputFormat e implementa una costumbre MultiLineRecordReader en lugar de la LineReader defecto.

Elegí extender NLineInputFormat porque quería tener la misma garantía de tener exactamente N línea (s) por división.

se toma Este lector registro casi como es de http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

Las únicas cosas que modificado es la propiedad para maxLineLength que ahora utiliza la nueva API, y el valor de NLINESTOPROCESS que consigue leído desde setNumLinesPerSplit() insead de ser hardcoded de NLineInputFormat (para mayor flexibilidad).

aquí está el resultado:

public class MultiLineInputFormat extends NLineInputFormat{ 
    @Override 
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) { 
     context.setStatus(genericSplit.toString()); 
     return new MultiLineRecordReader(); 
    } 

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{ 
     private int NLINESTOPROCESS; 
     private LineReader in; 
     private LongWritable key; 
     private Text value = new Text(); 
     private long start =0; 
     private long end =0; 
     private long pos =0; 
     private int maxLineLength; 

     @Override 
     public void close() throws IOException { 
      if (in != null) { 
       in.close(); 
      } 
     } 

     @Override 
     public LongWritable getCurrentKey() throws IOException,InterruptedException { 
      return key; 
     } 

     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (start == end) { 
       return 0.0f; 
      } 
      else { 
       return Math.min(1.0f, (pos - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException { 
      NLINESTOPROCESS = getNumLinesPerSplit(context); 
      FileSplit split = (FileSplit) genericSplit; 
      final Path file = split.getPath(); 
      Configuration conf = context.getConfiguration(); 
      this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE); 
      FileSystem fs = file.getFileSystem(conf); 
      start = split.getStart(); 
      end= start + split.getLength(); 
      boolean skipFirstLine = false; 
      FSDataInputStream filein = fs.open(split.getPath()); 

      if (start != 0){ 
       skipFirstLine = true; 
       --start; 
       filein.seek(start); 
      } 
      in = new LineReader(filein,conf); 
      if(skipFirstLine){ 
       start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start)); 
      } 
      this.pos = start; 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (key == null) { 
       key = new LongWritable(); 
      } 
      key.set(pos); 
      if (value == null) { 
       value = new Text(); 
      } 
      value.clear(); 
      final Text endline = new Text("\n"); 
      int newSize = 0; 
      for(int i=0;i<NLINESTOPROCESS;i++){ 
       Text v = new Text(); 
       while (pos < end) { 
        newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength)); 
        value.append(v.getBytes(),0, v.getLength()); 
        value.append(endline.getBytes(),0, endline.getLength()); 
        if (newSize == 0) { 
         break; 
        } 
        pos += newSize; 
        if (newSize < maxLineLength) { 
         break; 
        } 
       } 
      } 
      if (newSize == 0) { 
       key = null; 
       value = null; 
       return false; 
      } else { 
       return true; 
      } 
     } 
    } 

} 
Cuestiones relacionadas