2012-06-16 9 views
8

Tengo un trabajo con haddop que su salida debe escribirse en HBase. Realmente no necesito reductor, el tipo de fila que me gustaría insertar está determinado en el Mapper.Hadoop - Escribiendo a HBase directamente desde el Mapper

¿Cómo puedo usar TableOutputFormat para lograr esto? De todos los ejemplos que he visto, la suposición es que el reductor es el que crea el Put, y que TableMapper es solo para leer de la tabla HBase.

En mi caso, la entrada es HDFS, la salida está puesta en una tabla específica, no puedo encontrar nada en TableMapReduceUtil que pueda ayudarme con eso tampoco.

¿Hay algún ejemplo que me ayude con eso?

Por cierto, estoy usando la nueva API de Hadoop

+0

¿Cuántos registros está tratando de insertar? – Gevorg

Respuesta

1

Sólo tiene que hacer la salida asignador de la pareja. OutputFormat solo especifica cómo persistir los pares clave-valor de salida. No necesariamente significa que los valores clave provienen del reductor. Usted tendría que hacer algo como esto en el mapeador:

... extends TableMapper<ImmutableBytesWritable, Put>() { 
    ... 
    ... 
    context.write(<some key>, <some Put or Delete object>); 
} 
7

Este es el ejemplo de leer el archivo y poner todas las líneas hacia hbase. Este ejemplo es de "Hbase: la guía definitiva" y puede encontrarlo en el repositorio. Para conseguirlo solo clon de recompra en su equipo:

git clone git://github.com/larsgeorge/hbase-book.git 

En este libro también se puede encontrar todas las explicaciones sobre el código. Pero si algo es incomprensible para usted, siéntase libre de preguntar.

` public class ImportFromFile { 
    public static final String NAME = "ImportFromFile"; 
    public enum Counters { LINES } 

    static class ImportMapper 
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { 
     private byte[] family = null; 
     private byte[] qualifier = null; 

     @Override 
     protected void setup(Context context) 
     throws IOException, InterruptedException { 
     String column = context.getConfiguration().get("conf.column"); 
     byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); 
     family = colkey[0]; 
     if (colkey.length > 1) { 
      qualifier = colkey[1]; 
     } 
     } 

     @Override 
     public void map(LongWritable offset, Text line, Context context) 
     throws IOException { 
      try { 
      String lineString = line.toString(); 
      byte[] rowkey = DigestUtils.md5(lineString); 
      Put put = new Put(rowkey); 
      put.add(family, qualifier, Bytes.toBytes(lineString)); 
      context.write(new ImmutableBytesWritable(rowkey), put); 
      context.getCounter(Counters.LINES).increment(1); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     } 
    } 

    private static CommandLine parseArgs(String[] args) throws ParseException { 
     Options options = new Options(); 
     Option o = new Option("t", "table", true, 
     "table to import into (must exist)"); 
     o.setArgName("table-name"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("c", "column", true, 
     "column to store row data into (must exist)"); 
     o.setArgName("family:qualifier"); 
     o.setRequired(true); 
     options.addOption(o); 
     o = new Option("i", "input", true, 
     "the directory or file to read from"); 
     o.setArgName("path-in-HDFS"); 
     o.setRequired(true); 
     options.addOption(o); 
     options.addOption("d", "debug", false, "switch on DEBUG log level"); 
     CommandLineParser parser = new PosixParser(); 
     CommandLine cmd = null; 
     try { 
     cmd = parser.parse(options, args); 
     } catch (Exception e) { 
     System.err.println("ERROR: " + e.getMessage() + "\n"); 
     HelpFormatter formatter = new HelpFormatter(); 
     formatter.printHelp(NAME + " ", options, true); 
     System.exit(-1); 
     } 
     return cmd; 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = HBaseConfiguration.create(); 
     String[] otherArgs = 
     new GenericOptionsParser(conf, args).getRemainingArgs(); 
     CommandLine cmd = parseArgs(otherArgs); 
     String table = cmd.getOptionValue("t"); 
     String input = cmd.getOptionValue("i"); 
     String column = cmd.getOptionValue("c"); 
     conf.set("conf.column", column); 
     Job job = new Job(conf, "Import from file " + input + " into table " + table); 

      job.setJarByClass(ImportFromFile.class); 
     job.setMapperClass(ImportMapper.class); 
     job.setOutputFormatClass(TableOutputFormat.class); 
     job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table); 
     job.setOutputKeyClass(ImmutableBytesWritable.class); 
     job.setOutputValueClass(Writable.class); 
     job.setNumReduceTasks(0); 
     FileInputFormat.addInputPath(job, new Path(input)); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
    }` 
+1

Me aparece lo siguiente: 'Excepción de contenedor-lanzamiento: org.apache.hadoop.util.Shell $ ExitCodeException' ¿Te encontraste con este problema también con el código anterior? Estoy usando Hadoop2.4 y Hbase0.94.18 – Gevorg

Cuestiones relacionadas