2011-03-27 12 views
12

Necesito acceder a los contadores de mi mapeador en mi reductor. es posible? ¿Si es así, Cómo se hace?Accediendo al contador de un mapeador desde un reductor

A modo de ejemplo: mi asignador es:

public class CounterMapper extends Mapper<Text,Text,Text,Text> { 

    static enum TestCounters { TEST } 

    @Override 
    protected void map(Text key, Text value, Context context) 
        throws IOException, InterruptedException { 
     context.getCounter(TestCounters.TEST).increment(1); 
     context.write(key, value); 
    } 
} 

Mi reductor es

public class CounterReducer extends Reducer<Text,Text,Text,LongWritable> { 

    @Override 
    protected void reduce(Text key, Iterable<Text> values, Context context) 
         throws IOException, InterruptedException { 
     Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); 
     long counterValue = counter.getValue(); 
     context.write(key, new LongWritable(counterValue)); 
    } 
} 

contravalor es siempre 0. ¿Estoy haciendo algo mal o es sólo no es posible?

Respuesta

2

El objetivo de map/reduce es paralelizar los trabajos. Habrá muchos mapeadores/reductores únicos, por lo que el valor no sería correcto de todos modos, excepto para esa ejecución del par de mapa/reducción.

tienen un número de palabras ejemplo:

http://wiki.apache.org/hadoop/WordCount

podría cambiar el context.write (palabra, uno) para context.write (línea, uno)

1

Los valores globales de venta libre son nunca transmitir de vuelta a cada asignador o reductor. Si desea que el # de registros del asignador esté disponible para el reductor, deberá contar con algún mecanismo externo para hacerlo.

+0

El JobTracker mantiene un registro de los contadores. –

9

En la configuración del Reducer (JobConf), puede usar el objeto JobConf para buscar la identificación del trabajo del reductor. Con eso, su reductor puede crear su propio cliente de trabajo, es decir, una conexión con el rastreador de trabajos, y consultar los contadores para este trabajo (o cualquier trabajo para el caso).

// in the Reducer class... 
private long mapperCounter; 

@Override 
public void configure(JobConf conf) { 
    JobClient client = new JobClient(conf); 
    RunningJob parentJob = 
     client.getJob(JobID.forName(conf.get("mapred.job.id"))); 
    mapperCounter = parentJob.getCounters().getCounter(MAP_COUNTER_NAME); 
} 

Ahora puede usar mapperCounter dentro del método reduce().

Realmente necesita un try-catch aquí. Estoy usando la antigua API, pero no debería ser difícil adaptarla para la nueva API.

Tenga en cuenta que los contadores aplicadores todos deben estar finalizados antes de iniciar cualquier reductor, tan contrario al comentario de Justin Thomas, creo que debería obtener valores precisos (siempre y cuando los reductores no están incrementando el mismo contador!)

+0

Puede parecer contra-intuitivo que los contadores de los mapeadores no están disponibles en reductores, pero en los reductores 'Hadoop' puede comenzar la ejecución antes de que todos los mapeadores finalicen. En ese caso, el valor de un contador podría leerse diferente en momentos diferentes en los reductores. Para saber más acerca de cómo pueden los reductores iniciarse antes de que los mapeadores de tiempo finalicen la ejecución, visite esta publicación: http://stackoverflow.com/questions/11672676/when-do-reduce-tasks-start-in-hadoop – abhinavkulkarni

+2

@abhinavkulkarni En realidad , ** solo ** la fase de mezcla del reductor puede comenzar antes de que todos los mapeadores comiencen, lo cual es irrelevante para los contadores. Entonces, cuando comienza la fase de reducción del reductor, todos los contadores de mapeadores son correctos. De la misma publicación: "Por otro lado, ordenar y reducir solo puede comenzar una vez que todos los mapeadores hayan terminado". – vefthym

8

implementado la solución de Jeff G en la nueva API:

@Override 
    public void setup(Context context) throws IOException, InterruptedException{ 
     Configuration conf = context.getConfiguration(); 
     Cluster cluster = new Cluster(conf); 
     Job currentJob = cluster.getJob(context.getJobID()); 
     mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); 
    } 
+2

Intenté esto pero obtengo un error de excepción de punto nulo java en la siguiente línea mapperCounter = currentJob.getCounters(). FindCounter (COUNTER_NAME), donde reemplacé el COUNTER_NAME con mi contador personalizado –

+0

Parece que 'cluster.getJob (context.getJobID()); 'no funciona en la operación autónoma de hadoop. Cuando se ejecuta en el modo de clúster de nodo único, esto funciona para mí. – dauer

1

me preguntó this question, pero no he resolver mi problema. Sin embargo, una solución alternativa vino a mi mente. En el mapeador, se cuenta el número de palabras y se puede escribir en la salida intermedia con la tecla mínima (para que este valor esté en la cabeza) en la función de limpieza que corre al final del mapeador. En el reductor, el número de palabras se calcula sumando valores en la cabeza. El código de muestra y una parte de su salida están disponibles a continuación.

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

import java.io.IOException; 
import java.util.StringTokenizer; 

/** 
* Created by tolga on 1/26/16. 
*/ 
public class WordCount { 
    static enum TestCounters { TEST } 
    public static class Map extends Mapper<Object, Text, Text, LongWritable> { 
     private final static LongWritable one = new LongWritable(1); 
     private Text word = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      String line = value.toString(); 
      StringTokenizer tokenizer = new StringTokenizer(line); 
      while (tokenizer.hasMoreTokens()) { 
       word.set(tokenizer.nextToken()); 
       context.write(word, one); 
       context.getCounter(TestCounters.TEST).increment(1); 
      } 
     } 

     @Override 
     protected void cleanup(Context context) throws IOException, InterruptedException { 
      context.write(new Text("!"),new LongWritable(context.getCounter(TestCounters.TEST).getValue())); 
     } 
    } 

    public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> { 

     public void reduce(Text key, Iterable<LongWritable> values, Context context) 
       throws IOException, InterruptedException { 
      int sum = 0; 
      for (LongWritable val : values) { 
       sum += val.get(); 
      } 
      context.write(key, new LongWritable(sum)); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 

     Job job = new Job(conf, "WordCount"); 
     job.setJarByClass(WordCount.class); 

     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(LongWritable.class); 

     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     job.waitForCompletion(true); 
    } 
} 

archivo de texto:

Turgut Özal University is a private university located in Ankara, Turkey. It was established in 2008 by the Turgut Özal Thought and Action Foundation and is named after former Turkish president Turgut Özal. 

salida intermedia

**! \t 33** 
 
2008 \t 1 
 
Action \t 1 
 
Ankara, \t 1 
 
Foundation \t 1 
 
It \t 1 
 
Thought \t 1 
 
Turgut \t 1 
 
Turgut \t 1 
 
Turgut \t 1

**! \t 33** 
 
2008 \t 1 
 
Action \t 1 
 
Ankara, \t 1 
 
Foundation \t 1 
 
It \t 1 
 
Thought \t 1 
 
Turgut \t 3

0

Mejora de la respuesta de Itzhaki

findCounter(COUNTER_NAME) ya no es compatible - https://hadoop.apache.org/docs/r2.7.0/api/org/apache/hadoop/mapred/Counters.html

@Override 
public void setup(Context context) throws IOException, InterruptedException{ 
    Configuration conf = context.getConfiguration(); 
    Cluster cluster = new Cluster(conf); 
    Job currentJob = cluster.getJob(context.getJobID()); 
    mapperCounter = currentJob.getCounters().findCounter(GROUP_NAME, COUNTER_NAME).getValue(); 
} 

GROUP_NAME se especifica, cuando se invoca el mostrador. p.ej.

context.getCounter("com.example.mycode", "MY_COUNTER").increment(1); 

continuación

mapperCounter = currentJob.getCounters().findCounter("com.example.mycode", "MY_COUNTER").getValue(); 

También, un punto importante que, si el contador no existe se inicializará con un valor de 0.

Cuestiones relacionadas