2012-06-11 11 views
6

Un patrón común en mi procesamiento de datos es agrupar por un conjunto de columnas, aplicar un filtro y luego aplanar nuevamente. Por ejemplo:Apache Pig: espacio prefijado de espacio de nombres (:) después de la operación de grupo

my_data_grouped = group my_data by some_column; 
my_data_grouped = filter my_data_grouped by <some expression>; 
my_data = foreach my_data_grouped flatten(my_data); 

El problema aquí es que si my_data comienza con un esquema similar (c1, c2, c3) después de esta operación tendrá un esquema similar (misdatos :: c1, c2 misdatos ::, misdatos :: c3). ¿Hay alguna manera de quitar fácilmente el prefijo "mydata ::" si las columnas son únicas?

Sé que puedo hacer algo como esto:

my_data = foreach my_data generate c1 as c1, c2 as c2, c3 as c3; 

obstante que obtiene incómoda y difícil de mantener para los conjuntos de datos con una gran cantidad de columnas y es imposible que los conjuntos de datos con columnas variables.

Respuesta

1

Puede poner la instrucción 'AS' en la misma línea que 'foreach'.

decir

my_data_grouped = group my_data by some_column; 
my_data_grouped = filter my_data_grouped by <some expression>; 
my_data = FOREACH my_data_grouped FLATTEN(my_data) AS (c1, c2, c3); 

Sin embargo, esto es sólo el mismo que hacerlo en 2 líneas, y no alivia el problema de 'conjuntos de datos con columnas variables'.

3

Si todos los campos de un esquema tienen el mismo conjunto de prefijos (por ej. Group1 :: id, group1 :: amount, etc.) puede ignorar el prefijo al hacer referencia a campos específicos (y simplemente hacer referencia a ellos como id, amount, etc)

Alternativamente, si todavía está buscando para despojar a un esquema de un solo nivel de prefijar puede utilizar una UDF como esto:

public class RemoveGroupFromTupleSchema extends EvalFunc<Tuple> { 

@Override 
public Tuple exec(Tuple input) throws IOException { 
    Tuple result = input; 
    return result; 
} 


@Override 
public Schema outputSchema(Schema input) throws FrontendException { 
    if(input.size() != 1) { 
     throw new RuntimeException("Expected input (tuple) but input does not have 1 field"); 
    } 

    List<Schema.FieldSchema> inputSchema = input.getFields(); 
    List<Schema.FieldSchema> outputSchema = new ArrayList<Schema.FieldSchema>(inputSchema); 
    for(int i = 0; i < inputSchema.size(); i++) { 
     Schema.FieldSchema thisInputFieldSchema = inputSchema.get(i); 
     String inputFieldName = thisInputFieldSchema.alias; 
     Byte dataType = thisInputFieldSchema.type; 

     String outputFieldName; 
     int findLoc = inputFieldName.indexOf("::"); 
     if(findLoc == -1) { 
      outputFieldName = inputFieldName; 
     } 
     else { 
      outputFieldName = inputFieldName.substring(findLoc+2); 
     } 
     Schema.FieldSchema thisOutputFieldSchema = new Schema.FieldSchema(outputFieldName, dataType); 
     outputSchema.set(i, thisOutputFieldSchema); 
    } 

    return new Schema(outputSchema); 
} 
} 
+0

Cómo utilizar este UDF? Gracias por adelantado. –

Cuestiones relacionadas