2010-08-16 19 views
16

Soy un novato en Hadoop. Estoy probando el programa Wordcount.MultipleOutputFormat en hadoop

Ahora para probar múltiples archivos de salida, uso MultipleOutputFormat. este enlace me ayudó a hacerlo. http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html

en mi clase del controlador que tuve

MultipleOutputs.addNamedOutput(conf, "even", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class); 

    MultipleOutputs.addNamedOutput(conf, "odd", 
      org.apache.hadoop.mapred.TextOutputFormat.class, Text.class, 
      IntWritable.class);` 

y mi reducen clase se convirtió en esta

public static class Reduce extends MapReduceBase implements 
     Reducer<Text, IntWritable, Text, IntWritable> { 
    MultipleOutputs mos = null; 

    public void configure(JobConf job) { 
     mos = new MultipleOutputs(job); 
    } 

    public void reduce(Text key, Iterator<IntWritable> values, 
      OutputCollector<Text, IntWritable> output, Reporter reporter) 
      throws IOException { 
     int sum = 0; 
     while (values.hasNext()) { 
      sum += values.next().get(); 
     } 
     if (sum % 2 == 0) { 
      mos.getCollector("even", reporter).collect(key, new IntWritable(sum)); 
     }else { 
      mos.getCollector("odd", reporter).collect(key, new IntWritable(sum)); 
     } 
     //output.collect(key, new IntWritable(sum)); 
    } 
    @Override 
    public void close() throws IOException { 
     // TODO Auto-generated method stub 
    mos.close(); 
    } 
} 

cosas funcionaron, pero consigo gran cantidad de archivos, (impar y uno para cada mapa -reduce)

La pregunta es: ¿Cómo puedo tener solo 2 archivos de salida (impar & incluso) para que cada salida impar de cada map-reduce se escriba en esa impar archivo, y lo mismo para incluso.

+5

está usando el MultipleOutputs no MultipleOutputFormat. Ambas son bibliotecas diferentes. –

Respuesta

3

Cada reductor utiliza un OutputFormat para escribir registros en. Es por eso que obtienes un conjunto de archivos pares e impares por reductor. Esto es por diseño para que cada reductor pueda realizar escrituras en paralelo.

Si solo desea un archivo impar e impar único, deberá establecer mapred.reduce.tasks en 1. Pero el rendimiento se verá afectado, ya que todos los correlacionadores se alimentarán en un solo reductor.

Otra opción es cambiar el proceso de la lectura de estos archivos para aceptar múltiples archivos de entrada, o escribir un proceso separado que combine estos archivos.

+3

en lugar de cambiar las tareas del mapa rojo, anulé la función getFilenameForKeyValue() y funcionó ... gracias. – raj

1

Se generarán varios archivos de salida en función del número de reductores.

Puede utilizar DFS Hadoop -getmerge a las salidas combinadas

+0

gracias :) pero necesito hacer esto solo con map reducir, – raj

3

escribí una clase para hacer esto. sólo lo utilizan su trabajo:

job.setOutputFormatClass(m_customOutputFormatClass); 

Ésta es la clase de mi:

import java.io.IOException; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Map.Entry; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapreduce.RecordWriter; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

/** 
* TextOutputFormat extension which enables writing the mapper/reducer's output in multiple files.<br> 
* <p> 
* <b>WARNING</b>: The number of different folder shuoldn't be large for one mapper since we keep an 
* {@link RecordWriter} instance per folder name. 
* </p> 
* <p> 
* In this class the folder name is defined by the written entry's key.<br> 
* To change this behavior simply extend this class and override the 
* {@link HdMultipleFileOutputFormat#getFolderNameExtractor()} method and create your own 
* {@link FolderNameExtractor} implementation. 
* </p> 
* 
* 
* @author ykesten 
* 
* @param <K> - Keys type 
* @param <V> - Values type 
*/ 
public class HdMultipleFileOutputFormat<K, V> extends TextOutputFormat<K, V> { 

    private String folderName; 

    private class MultipleFilesRecordWriter extends RecordWriter<K, V> { 

     private Map<String, RecordWriter<K, V>> fileNameToWriter; 
     private FolderNameExtractor<K, V> fileNameExtractor; 
     private TaskAttemptContext job; 

     public MultipleFilesRecordWriter(FolderNameExtractor<K, V> fileNameExtractor, TaskAttemptContext job) { 
      fileNameToWriter = new HashMap<String, RecordWriter<K, V>>(); 
      this.fileNameExtractor = fileNameExtractor; 
      this.job = job; 
     } 

     @Override 
     public void write(K key, V value) throws IOException, InterruptedException { 
      String fileName = fileNameExtractor.extractFolderName(key, value); 
      RecordWriter<K, V> writer = fileNameToWriter.get(fileName); 
      if (writer == null) { 
       writer = createNewWriter(fileName, fileNameToWriter, job); 
       if (writer == null) { 
        throw new IOException("Unable to create writer for path: " + fileName); 
       } 
      } 
      writer.write(key, value); 
     } 

     @Override 
     public void close(TaskAttemptContext context) throws IOException, InterruptedException { 
      for (Entry<String, RecordWriter<K, V>> entry : fileNameToWriter.entrySet()) { 
       entry.getValue().close(context); 
      } 
     } 

    } 

    private synchronized RecordWriter<K, V> createNewWriter(String folderName, 
      Map<String, RecordWriter<K, V>> fileNameToWriter, TaskAttemptContext job) { 
     try { 
      this.folderName = folderName; 
      RecordWriter<K, V> writer = super.getRecordWriter(job); 
      this.folderName = null; 
      fileNameToWriter.put(folderName, writer); 
      return writer; 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 

    @Override 
    public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException { 
     Path path = super.getDefaultWorkFile(context, extension); 
     if (folderName != null) { 
      String newPath = path.getParent().toString() + "/" + folderName + "/" + path.getName(); 
      path = new Path(newPath); 
     } 
     return path; 
    } 

    @Override 
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { 
     return new MultipleFilesRecordWriter(getFolderNameExtractor(), job); 
    } 

    public FolderNameExtractor<K, V> getFolderNameExtractor() { 
     return new KeyFolderNameExtractor<K, V>(); 
    } 

    public interface FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value); 
    } 

    private static class KeyFolderNameExtractor<K, V> implements FolderNameExtractor<K, V> { 
     public String extractFolderName(K key, V value) { 
      return key.toString(); 
     } 
    } 

}