2012-07-02 10 views
5

Me gusta generar múltiples tuplas desde una sola tupla. Lo que quiero decir es: Tengo un archivo con los siguientes datos.División de una tupla en múltiples tuplas en Pig

>> cat data 
ID | ColumnName1:Value1 | ColumnName2:Value2 

así que cargarlo por el siguiente comando

grunt >> A = load '$data' using PigStorage('|');  
grunt >> dump A;  
(ID,ColumnName1:Value1,ColumnName2:Value2) 

Ahora quieren dividir esta tupla en dos tuplas.

(ID, ColumnName1, Value1) 
(ID, ColumnName2, Value2) 

¿Puedo usar UDF junto con foreach y generar? Algo como el siguiente?

grunt >> foreach A generate SOMEUDF(A) 

EDIT:

tupla de entrada: (id1, columna 1, columna2) de salida: dos tuplas (id1, COLUMN1) y (id2, columna2) por lo que es lista o debería devolver una bolsa?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>> 
{ 
    public List<Tuple> exec(Tuple input) throws IOException { 
     if (input == null || input.size() == 0) 
      return null; 
     try{ 
      // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory. 
      // return list of tuples. 
     }catch(Exception e){ 
      throw WrappedIOException.wrap("Caught exception processing input row ", e); 
     } 
    } 
} 

¿Es correcto este enfoque?

Respuesta

10

Puede escribir una UDF o usar una secuencia de comandos PIG con funciones integradas.

Por ejemplo:

-- data should be chararray, PigStorage('|') return bytearray which will not work for this example 
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 

-- split by | and create a row so we can dereference it later 
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ; 

-- first column is id, rest is converted into a bag and flatten it to make rows 
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value; 
-- there will be records with (id, id), but id should not have ':' 
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; 
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val); 
dump final; 

Test de entrada:

1|c1:11:33|c2:12 
234|c1:21|c2:22 
33|c1:31|c2:32 
345|c1:41|c2:42 

SALIDA

(1,c1,11:33) 
(1,c2,12) 
(234,c1,21) 
(234,c2,22) 
(33,c1,31) 
(33,c2,32) 
(345,c1,41) 
(345,c2,42) 

espero que ayude.

Saludos.

+0

Muchas gracias. ¿Puedo hacer lo mismo escribiendo un UDF? Actualizo la pregunta. – FourOfAKind

+0

Sí, puedes. Ver la siguiente respuesta. – alexeipab

+0

Su gran ayuda. Gracias por tu tiempo. – FourOfAKind

6

Aquí está la versión UDF. Yo prefiero volver una bolsa:

import java.io.IOException; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 

/** 
* Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
* {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...} 
* 
* Default rows separator is '|' and key value separator is ':'. 
* In this implementation white spaces around separator characters are not removed. 
* ID can be made of any character (including sequence of white spaces). 
* @author 
* 
*/ 
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> { 

    private static final TupleFactory tupleFactory = TupleFactory.getInstance(); 
    private static final BagFactory bagFactory = BagFactory.getInstance(); 

    //Row separator character. Default is '|'. 
    private String rowsSeparator; 
    //Column value separator character. Default i 
    private String columnValueSeparator; 

    public TupleToBagColumnValuePairs() { 
     this.rowsSeparator = "\\|"; 
     this.columnValueSeparator = ":"; 
    } 

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) { 
     this.rowsSeparator = rowsSeparator; 
     this.columnValueSeparator = keyValueSeparator; 
    } 

    /** 
    * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray) 
    * @param outputBag Output tuples (id, column, value) are added to this bag 
    * @param id 
    * @param column 
    * @param value 
    * @throws ExecException 
    */ 
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException { 
     Tuple outputTuple = tupleFactory.newTuple(); 
     outputTuple.append(id); 
     outputTuple.append(column); 
     outputTuple.append(value); 
     outputBag.add(outputTuple); 
    } 

    /** 
    * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value) 
    * @param outputBag Output tuples (id, column, value) should be added to this bag 
    * @param id 
    * @param splitInputLine format column{separator}value, which start from index 1 
    * @throws ExecException 
    */ 
    protected void parseColumnValues(DataBag outputBag, String id, 
      String[] splitInputLine) throws ExecException { 
     for (int i = 1; i < splitInputLine.length; i++) { 
      if (splitInputLine[i] != null) { 
       int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator); 
       if (columnValueSplitIndex != -1) { 
        String column = splitInputLine[i].substring(0, columnValueSplitIndex); 
        String value = null; 
        if (columnValueSplitIndex + 1 < splitInputLine[i].length()) { 
         value = splitInputLine[i].substring(columnValueSplitIndex + 1); 
        } 
        this.addTuple(outputBag, id, column, value); 
       } else { 
        String column = splitInputLine[i]; 
        this.addTuple(outputBag, id, column, null); 
       } 
      } 
     } 
    } 

    /** 
    * input - contains only one field of type chararray, which will be split by '|' 
    * All inputs that are: null or of length 0 are ignored. 
    */ 
    @Override 
    public DataBag exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 1 || input.isNull(0)) { 
      return null; 
     } 

     String inputLine = (String)input.get(0); 
     String[] splitInputLine = inputLine.split(this.rowsSeparator, -1); 

     if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) { 
      String id = splitInputLine[0]; 
      DataBag outputBag = bagFactory.newDefaultBag();    
      if (splitInputLine.length == 1) { // there is just an id in the line 
       this.addTuple(outputBag, id, null, null); 
      } else { 
       this.parseColumnValues(outputBag, id, splitInputLine); 
      } 


      return outputBag; 
     } 
     return null; 
    } 

    @Override 
    public Schema outputSchema(Schema input) { 
     try { 
      if (input.size() != 1) { 
       throw new RuntimeException("Expected input to have only one field"); 
      } 

      Schema.FieldSchema inputFieldSchema = input.getField(0); 
      if (inputFieldSchema.type != DataType.CHARARRAY) { 
       throw new RuntimeException("Expected a CHARARRAY as input"); 
      } 

      Schema tupleSchema = new Schema(); 
      tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY)); 

      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG)); 
     } catch (FrontendException exx) { 
      throw new RuntimeException(exx); 
     } 
    } 

} 

Así es como se utiliza en PIG:

register 'path to the jar'; 
define IdColumnValue myPackage.TupleToBagColumnValuePairs(); 

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2); 
dump result; 

Una buena fuente de inspiración para la escritura UDF con bolsas ven DataFu source code by LinkedIn

0

usted podría utilizar TransposeTupleToBag (UDF de DataFu lib) en la salida de STRSPLIT para obtener la bolsa, y luego FLATTEN la bolsa para crear una fila separada por columna original.

Cuestiones relacionadas