Lo siento por la publicación cruzada en la lista de correo del usuario de hadoop y aquí, pero esto me está resultando un asunto urgente.Computing establece la intersección y establece la diferencia de los registros de dos archivos con hadoop
Mi problema es el siguiente: Tengo dos archivos de entrada, y yo quiero para determinar
- a) El número de líneas que sólo se producen en el archivo 1
- b) El número de líneas que sólo se producen en el archivo 2
- c) El número de líneas comunes a ambos (por ejemplo, en lo que se refiere a la igualdad cadena)
ejemplo:
File 1:
a
b
c
File 2:
a
d
salida deseada para cada caso:
lines_only_in_1: 2 (b, c)
lines_only_in_2: 1 (d)
lines_in_both: 1 (a)
Básicamente mi método es el siguiente: escribí mi propia LineRecordReader, por lo que el asignador recibe un par formado por la línea (texto) y un byte indicando el archivo de origen (ya sea 0 o 1). El asignador solo devuelve el par nuevamente, así que en realidad no hace nada. Sin embargo, el efecto secundario es, que el combinador recibe una
Map<Line, Iterable<SourceId>>
(donde sourceid es o bien 0 o 1).
Ahora, para cada línea que puedo conseguir el conjunto de fuentes que aparece. Por lo tanto, podría escribir un combinador que cuenta para cada caso (a, b, c) el número de líneas (Listado 1)
El combinador genera un 'resumen' solo en la limpieza (¿es seguro?). Así que este resumen se parece a:
lines_only_in_1 2531
lines_only_in_2 3190
lines_in_both 901
En el reductor que entonces sólo resumir los valores de estos resúmenes. (Entonces, la salida del reductor se ve igual que la del combinador).
Sin embargo, el principal problema es, que necesito para tratar tanto los archivos de origen como un solo archivo virtual que los registros de rendimiento de la forma (línea, sourceid) // sourceid 0 o 1
Y estoy no estoy seguro de cómo lograr eso. Así que la pregunta es si puedo evitar el preprocesamiento y la fusión de los archivos de antemano, y hacerlo sobre la marcha con algo así como un lector de archivos y lector de registro personalizado. Cualquier código es muy apreciado.
Saludos, Noel
Listado 1:
public static class SourceCombiner
extends Reducer<Text, ByteWritable, Text, LongWritable> {
private long countA = 0;
private long countB = 0;
private long countC = 0; // C = lines (c)ommon to both sources
@Override
public void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
Set<Byte> fileIds = new HashSet<Byte>();
for (ByteWritable val : values) {
byte fileId = val.get();
fileIds.add(fileId);
}
if(fileIds.contains((byte)0)) { ++countA; }
if(fileIds.contains((byte)1)) { ++countB; }
if(fileIds.size() >= 2) { ++countC; }
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException
{
context.write(new Text("in_a_distinct_count_total"), new LongWritable(countA));
context.write(new Text("in_b_distinct_count_total"), new LongWritable(countB));
context.write(new Text("out_common_distinct_count_total"), new LongWritable(countC));
}
}
Hola, estaba un poco confuso: El punto es que quiero que los combinadores solo entreguen el resumen (el número de líneas en 1, 2 y común) al reductor, no es necesario que todas las líneas se envían de vuelta al reductor. Pero para que esto funcione, los combinadores deben ver los registros de ambos archivos juntos (mi RecordReader ya produce pares (línea, archivoId); la asignación de nombrearchivo a archivoID se transmite con el objeto config). Sin embargo, al agregar los archivos con dos declaraciones FileInputFormat.addInputPath (trabajo, archivo), los archivos se procesan individualmente, por lo que los combinadores no ven su 'unión'. –
puh que es una "optimización" realmente extraña. Pero buen punto. –
Sry para la respuesta tardía; ¿Es posible que mi idea no funcione? El archivo de origen se divide y las divisiones se envían a los nodos. Los nodos luego leen los registros de su división correspondiente. Por lo tanto, los registros duplicados en el archivo fuente pueden ubicarse en varias divisiones y, por lo tanto, distribuirse en varios nodos. Por lo tanto, obtener una agrupación de los duplicados solo es posible en el reductor. ¿Es esto correcto? –