2012-06-14 8 views
8

Estoy pensando en utilizar HBase como fuente para uno de mis trabajos de MapReduce. Sé que TableInputFormat especifica una división de entrada (y, por lo tanto, un asignador) por región. Sin embargo, esto parece ineficiente. Realmente me gustaría tener varios mapeadores trabajando en una determinada región a la vez. ¿Puedo lograr esto ampliando TableInputFormatBase? ¿Puedes indicarme un ejemplo? Además, ¿es esto una buena idea?Cuando uso HBase como fuente para MapReduce, ¿puedo extender TableInputFormatBase para crear divisiones múltiples y mapeadores múltiples para cada región?

Gracias por la ayuda.

Respuesta

1

No estoy seguro si se pueden especificar varios creadores de mapas para una región dada, pero tenga en cuenta lo siguiente:

Si cree que un asignador es ineficiente por región (tal vez sus nodos de datos no tienen suficientes recursos como #cpus) , quizás pueda especificar tamaños de regiones más pequeños en el archivo hbase-site.xml.

aquí es un sitio para las opciones configs por defecto si desea buscar en el cambio que: http://hbase.apache.org/configuration.html#hbase_default_configurations

tenga en cuenta que al hacer que el tamaño de la región pequeña, le aumenta el número de archivos de la DFS, y esto puede limitar la capacidad de su hadoop DFS dependiendo de la memoria de su namenode. Recuerde, el uso de la memoria del namenode está directamente relacionado con la cantidad de archivos en su DFS. Esto puede o no ser relevante para su situación, ya que no sé cómo se está utilizando su clúster. ¡Nunca hay una solución milagrosa para estas preguntas!

-1

Esto sería útil si desea escanear regiones grandes (cientos de millones de filas) con escaneo acondicionado que solo encuentra unos pocos registros. Esto evitará de ScannerTimeoutException

package org.apache.hadoop.hbase.mapreduce; 

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 

public class RegionSplitTableInputFormat extends TableInputFormat { 

    public static final String REGION_SPLIT = "region.split"; 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 

     Configuration conf = context.getConfiguration(); 
     int regionSplitCount = conf.getInt(REGION_SPLIT, 0); 
     List<InputSplit> superSplits = super.getSplits(context); 
     if (regionSplitCount <= 0) { 
      return superSplits; 
     } 

     List<InputSplit> splits = new ArrayList<InputSplit>(superSplits.size() * regionSplitCount); 

     for (InputSplit inputSplit : superSplits) { 
      TableSplit tableSplit = (TableSplit) inputSplit; 
      System.out.println("splitting by " + regionSplitCount + " " + tableSplit); 
      byte[] startRow0 = tableSplit.getStartRow(); 
      byte[] endRow0 = tableSplit.getEndRow(); 
      boolean discardLastSplit = false; 
      if (endRow0.length == 0) { 
       endRow0 = new byte[startRow0.length]; 
       Arrays.fill(endRow0, (byte) 255); 
       discardLastSplit = true; 
      } 
      byte[][] split = Bytes.split(startRow0, endRow0, regionSplitCount); 
      if (discardLastSplit) { 
       split[split.length - 1] = new byte[0]; 
      } 
      for (int regionSplit = 0; regionSplit < split.length - 1; regionSplit++) { 
       byte[] startRow = split[regionSplit]; 
       byte[] endRow = split[regionSplit + 1]; 
       TableSplit newSplit = new TableSplit(tableSplit.getTableName(), startRow, endRow, 
         tableSplit.getLocations()[0]); 
       splits.add(newSplit); 
      } 
     } 

     return splits; 
    } 
} 
0

1. Está absolutamente bien solo asegúrate de que el conjunto de claves sea mutuamente exclusivo entre los mapeadores.

  1. te enviaban crear demasiados clientes ya que esto puede dar lugar a gran cantidad de GC, ya que durante leer hbase hbase batido bloque de caché sucede
0

El uso de este MultipleScanTableInputFormat, puede utilizar Configuración MultipleScanTableInputFormat.PARTITIONS_PER_REGION_SERVER para controlar cuántos mapeadores se deberían ejecutar contra un solo servidor de regiones. La clase agrupará todas las divisiones de entrada por su ubicación (servidor de regiones), y el lector de registros repetirá adecuadamente todas las divisiones agregadas para el asignador.

Aquí está el ejemplo

https://gist.github.com/bbeaudreault/9788499#file-multiplescantableinputformat-java-L90

que el trabajo que ha creado las múltiples escisiones agregados para un único asignador

private List<InputSplit> getAggregatedSplits(JobContext context) throws IOException { 
final List<InputSplit> aggregatedSplits = new ArrayList<InputSplit>(); 

final Scan scan = getScan(); 

for (int i = 0; i < startRows.size(); i++) { 
    scan.setStartRow(startRows.get(i)); 
    scan.setStopRow(stopRows.get(i)); 

    setScan(scan); 

    aggregatedSplits.addAll(super.getSplits(context)); 
} 

// set the state back to where it was.. 
scan.setStopRow(null); 
scan.setStartRow(null); 

setScan(scan); 

return aggregatedSplits; 
} 

Crear partición servidor Región

@Override 
public List<InputSplit> getSplits(JobContext context) throws IOException { 
List<InputSplit> source = getAggregatedSplits(context); 

if (!partitionByRegionServer) { 
    return source; 
} 

// Partition by regionserver 
Multimap<String, TableSplit> partitioned = ArrayListMultimap.<String, TableSplit>create(); 
for (InputSplit split : source) { 
    TableSplit cast = (TableSplit) split; 
    String rs = cast.getRegionLocation(); 

    partitioned.put(rs, cast); 
} 
Cuestiones relacionadas