2011-06-24 17 views
5

Lo siento por la publicación cruzada en la lista de correo del usuario de hadoop y aquí, pero esto me está resultando un asunto urgente.Computing establece la intersección y establece la diferencia de los registros de dos archivos con hadoop

Mi problema es el siguiente: Tengo dos archivos de entrada, y yo quiero para determinar

  • a) El número de líneas que sólo se producen en el archivo 1
  • b) El número de líneas que sólo se producen en el archivo 2
  • c) El número de líneas comunes a ambos (por ejemplo, en lo que se refiere a la igualdad cadena)

ejemplo:

File 1: 
a 
b 
c 

File 2: 
a 
d 

salida deseada para cada caso:

lines_only_in_1: 2   (b, c) 
lines_only_in_2: 1   (d) 
lines_in_both: 1   (a) 

Básicamente mi método es el siguiente: escribí mi propia LineRecordReader, por lo que el asignador recibe un par formado por la línea (texto) y un byte indicando el archivo de origen (ya sea 0 o 1). El asignador solo devuelve el par nuevamente, así que en realidad no hace nada. Sin embargo, el efecto secundario es, que el combinador recibe una

Map<Line, Iterable<SourceId>> 

(donde sourceid es o bien 0 o 1).

Ahora, para cada línea que puedo conseguir el conjunto de fuentes que aparece. Por lo tanto, podría escribir un combinador que cuenta para cada caso (a, b, c) el número de líneas (Listado 1)

El combinador genera un 'resumen' solo en la limpieza (¿es seguro?). Así que este resumen se parece a:

lines_only_in_1 2531 
lines_only_in_2 3190 
lines_in_both  901 

En el reductor que entonces sólo resumir los valores de estos resúmenes. (Entonces, la salida del reductor se ve igual que la del combinador).

Sin embargo, el principal problema es, que necesito para tratar tanto los archivos de origen como un solo archivo virtual que los registros de rendimiento de la forma (línea, sourceid) // sourceid 0 o 1

Y estoy no estoy seguro de cómo lograr eso. Así que la pregunta es si puedo evitar el preprocesamiento y la fusión de los archivos de antemano, y hacerlo sobre la marcha con algo así como un lector de archivos y lector de registro personalizado. Cualquier código es muy apreciado.

Saludos, Noel

Listado 1:

public static class SourceCombiner 
    extends Reducer<Text, ByteWritable, Text, LongWritable> { 

    private long countA = 0; 
    private long countB = 0; 
    private long countC = 0; // C = lines (c)ommon to both sources 

    @Override 
    public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException { 
     Set<Byte> fileIds = new HashSet<Byte>(); 
     for (ByteWritable val : values) { 
      byte fileId = val.get(); 

      fileIds.add(fileId); 
     } 

     if(fileIds.contains((byte)0)) { ++countA; } 
     if(fileIds.contains((byte)1)) { ++countB; } 
     if(fileIds.size() >= 2) { ++countC; } 
    } 

    protected void cleanup(Context context) 
      throws java.io.IOException, java.lang.InterruptedException 
    { 
     context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA)); 
     context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB)); 
     context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC)); 
    } 
} 

Respuesta

2

Está bien, hay que admitir que realmente no coger la esencia de lo que ha intentado hasta ahora, pero tengo una enfoque simple para hacer las cosas que pueda necesitar.

Eche un vistazo al filemapper. Éste obtendrá el nombre del archivo y lo enviará con cada línea de la entrada.

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> { 

     static Text fileName; 

     @Override 
     protected void map(LongWritable key, Text value, Context context) 
       throws IOException, InterruptedException { 
      context.write(value, fileName); 
     } 

     @Override 
     protected void setup(Context context) throws IOException, 
       InterruptedException { 

      String name = ((FileSplit) context.getInputSplit()).getPath().getName(); 
      fileName = new Text(name); 
     } 
    } 

Ahora tenemos un montón de/valores clave que se parecen a esto (en lo que se refiere a su ejemplo)

a File 1 
    b File 1 
    c File 1 

    a File 2 
    d File 2 

Obviamente reduciéndolos le conseguirán una entrada como esta:

a File 1,File 2 
    b File 1 
    c File 1 
    d File 2 

Lo que hay que hacer en su reductor podría tener este aspecto:

public class FileReducer extends Reducer<Text, Text, Text, Text> { 

    enum Counter { 
     LINES_IN_COMMON, LINES_IN_FIRST, LINES_IN_SECOND 
    } 

    @Override 
    protected void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     HashSet<String> set = new HashSet<String>(); 
     for (Text t : values) { 
      set.add(t.toString()); 
     } 

     // if we have only two files and we have just two records in our hashset 
     // the line is contained in both files 
     if (set.size() == 2) { 
      context.getCounter(Counter.LINES_IN_COMMON).increment(1); 
     } else { 
      // sorry this is a bit dirty... 
      String t = set.iterator().next(); 
      // determine which file it was by checking for the name: 
      if(t.toString().equals("YOUR_FIRST_FILE_NAME")){ 
       context.getCounter(Counter.LINES_IN_FIRST).increment(1); 
      } else { 
       context.getCounter(Counter.LINES_IN_SECOND).increment(1); 
      } 
     } 
    } 

} 

Debe reemplazar la cadena dentro de la instrucción if de sus nombres de archivo.

Creo que usar el contador de trabajo es un poco más claro que usar primitivas propias y escribirlas en el contexto de la limpieza. Puede recuperar los contadores para un trabajo llamando a estas cosas después de la finalización:

Job job = new Job(new Configuration()); 
//setup stuff etc omitted.. 
job.waitForCompletion(true); 
// do the same line with the other enums 
long linesInCommon = job.getCounters().findCounter(Counter.LINES_IN_COMMON).getValue(); 
Nunca

embargo, si necesita los números de las líneas en común, etc en sus HDFS, y luego ir por su solución.

Espero que te haya ayudado.

+0

Hola, estaba un poco confuso: El punto es que quiero que los combinadores solo entreguen el resumen (el número de líneas en 1, 2 y común) al reductor, no es necesario que todas las líneas se envían de vuelta al reductor. Pero para que esto funcione, los combinadores deben ver los registros de ambos archivos juntos (mi RecordReader ya produce pares (línea, archivoId); la asignación de nombrearchivo a archivoID se transmite con el objeto config). Sin embargo, al agregar los archivos con dos declaraciones FileInputFormat.addInputPath (trabajo, archivo), los archivos se procesan individualmente, por lo que los combinadores no ven su 'unión'. –

+0

puh que es una "optimización" realmente extraña. Pero buen punto. –

+0

Sry para la respuesta tardía; ¿Es posible que mi idea no funcione? El archivo de origen se divide y las divisiones se envían a los nodos. Los nodos luego leen los registros de su división correspondiente. Por lo tanto, los registros duplicados en el archivo fuente pueden ubicarse en varias divisiones y, por lo tanto, distribuirse en varios nodos. Por lo tanto, obtener una agrupación de los duplicados solo es posible en el reductor. ¿Es esto correcto? –

Cuestiones relacionadas