2011-04-07 14 views
7

Me encuentro con un problema extraño. Cuando ejecuto mi trabajo Hadoop en un gran conjunto de datos (> 1 TB comprimido archivos de texto), varias de las tareas reducir fallan, con stacktraces como éstas:Hadoop: fusión intermedia fallida

java.io.IOException: Task: attempt_201104061411_0002_r_000044_0 - The reduce copier failed 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115) 
    at org.apache.hadoop.mapred.Child.main(Child.java:234) 
Caused by: java.io.IOException: Intermediate merge failed 
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714) 
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639) 
Caused by: java.lang.RuntimeException: java.io.EOFException 
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128) 
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) 
    at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:139) 
    at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103) 
    at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:335) 
    at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350) 
    at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156) 
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2698) 
    ... 1 more 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readInt(DataInputStream.java:375) 
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:241) 
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125) 
    ... 8 more 
java.io.IOException: Task: attempt_201104061411_0002_r_000056_0 - The reduce copier failed 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:240) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115) 
    at org.apache.hadoop.mapred.Child.main(Child.java:234) 
Caused by: java.io.IOException: Intermediate merge failed 
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714) 
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639) 
Caused by: java.lang.RuntimeException: java.io.EOFException 
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128) 
    at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373) 
    at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:123) 
    at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:50) 
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:447) 
    at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381) 
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:107) 
    at org.apache.hadoop.mapred.Merger.merge(Merger.java:93) 
    at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2689) 
    ... 1 more 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readFully(DataInputStream.java:180) 
    at org.apache.hadoop.io.Text.readString(Text.java:402) 
    at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:240) 
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122) 
    ... 9 more 

No todos mis reductores fallan. Varios a menudo tienen éxito antes de ver los fracasos con los demás. Como puede ver, las stacktraces siempre parecen originarse desde IPAndIPCookieCount.readFields() y siempre en la etapa de fusión en memoria, pero no siempre desde la misma parte de readFields.

Este trabajo tiene éxito cuando se ejecuta en conjuntos de datos más pequeños (aproximadamente 1/30 del tamaño). Hay casi tantas salidas como entradas para el trabajo, pero cada registro de salida es más corto. Este trabajo es esencialmente una implementación de un tipo secundario.

Estamos utilizando la distribución CDH3 Hadoop.

Aquí es mi costumbre WritableComparable aplicación:

public static class IpAndIpCookieCount implements WritableComparable<IpAndIpCookieCount> { 

     private String ip; 
     private int ipCookieCount; 

     public IpAndIpCookieCount() { 
      // empty constructor for hadoop 
     } 

     public IpAndIpCookieCount(String ip, int ipCookieCount) { 
      this.ip = ip; 
      this.ipCookieCount = ipCookieCount; 
     } 

     public String getIp() { 
      return ip; 
     } 

     public int getIpCookieCount() { 
      return ipCookieCount; 
     } 

     @Override 
     public void readFields(DataInput in) throws IOException { 
      ip = Text.readString(in); 
      ipCookieCount = in.readInt(); 
     } 

     @Override 
     public void write(DataOutput out) throws IOException { 
      Text.writeString(out, ip); 
      out.writeInt(ipCookieCount); 
     } 

     @Override 
     public int compareTo(IpAndIpCookieCount other) { 
      int firstComparison = ip.compareTo(other.getIp()); 
      if (firstComparison == 0) { 
       int otherIpCookieCount = other.getIpCookieCount(); 
       if (ipCookieCount == otherIpCookieCount) { 
        return 0; 
       } else { 
        return ipCookieCount < otherIpCookieCount ? 1 : -1; 
       } 
      } else { 
       return firstComparison; 
      } 
     } 

     @Override 
     public boolean equals(Object o) { 
      if (o instanceof IpAndIpCookieCount) { 
       IpAndIpCookieCount other = (IpAndIpCookieCount) o; 
       return ip.equals(other.getIp()) && ipCookieCount == other.getIpCookieCount(); 
      } else { 
       return false; 
      } 
     } 

     @Override 
     public int hashCode() { 
      return ip.hashCode()^ipCookieCount; 
     } 

    } 

El método readFields es muy simple, y no veo ningún problema en esta clase. Además, he visto a otras personas conseguir esencialmente la misma traza de la pila:

Ninguno parecía realmente han descubierto el problema detrás de esto Los dos últimos parecen sugerir que esto podría ser un problema de memoria (aunque estas stacktraces no son OutOfMemoryException s). Al igual que la segunda a la última publicación en esa lista de enlaces, he intentado establecer el número de reductores más alto (hasta 999), pero sigo teniendo fallas. No (todavía) intenté asignar más memoria para reducir tareas, ya que eso requeriría que reconfiguremos nuestro clúster.

¿Esto es un error en Hadoop? ¿O estoy haciendo algo mal?

EDIT: Mis datos se dividen por día. Si ejecuto el trabajo 7 veces, una vez por cada día, las 7 se completan. Si ejecuto un trabajo durante los 7 días, falla. El informe grande durante los 7 días verá exactamente las mismas claves que las más pequeñas (en conjunto), pero obviamente no en el mismo orden, en los mismos reductores, etc.

+0

También encontré este problema, pero no aparece todo el tiempo. En el mismo conjunto de datos, que se ejecuta en las mismas condiciones, a veces mi trabajo falla con 'EOFException' causada por un método' readByte() 'en el método' WritableComparable.readFields (..) '. Creo que podría ser un problema de red que causa algún tipo de retraso. –

Respuesta

1

Creo que esto es un artefacto de Cloudera backport de MAPREDUCE-947 a CDH3. Este parche da como resultado la formación de un archivo _SUCCESS para un trabajo exitoso.

También se crea un archivo _SUCCESS en la carpeta de salida para trabajos exitosos. Un parámetro de configuración mapreduce.fileoutputcommitter.marksuccessfuljobs se puede establecer en false para deshabilitar la creación del archivo _SUCCESS, o en true para permitir la creación del archivo _SUCCESS.

En cuanto a su error,

Caused by: java.io.EOFException 
    at java.io.DataInputStream.readFully(DataInputStream.java:180) 

y comparándolo con los errores que he visto para este problema antes,

Exception in thread "main" java.io.EOFException 
    at java.io.DataInputStream.readFully(DataInputStream.java:180) 
    at java.io.DataInputStream.readFully(DataInputStream.java:152) 
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1465) 
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1437) 
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424) 
    at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419) 
    at org.apache.hadoop.mapred.SequenceFileOutputFormat.getReaders(SequenceFileOutputFormat.java:89) 
    at org.apache.nutch.crawl.CrawlDbReader.processStatJob(CrawlDbReader.java:323) 
    at org.apache.nutch.crawl.CrawlDbReader.main(CrawlDbReader.java:511) 

y en el Mahout mailing list

Exception in thread "main" java.io.EOFException 
    at java.io.DataInputStream.readFully(DataInputStream.java:180) 
    at java.io.DataInputStream.readFully(DataInputStream.java:152) 
    at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:1457) 
    at 
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1435) 
    at 
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1424) 
    at 
org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1419) 
    at 
org.apache.mahout.df.mapreduce.partial.Step0Job.parseOutput(Step0Job.java:145) 
    at 
org.apache.mahout.df.mapreduce.partial.Step0Job.run(Step0Job.java:119) 
    at 
org.apache.mahout.df.mapreduce.partial.PartialBuilder.parseOutput(PartialBuilder.java:115) 
    at org.apache.mahout.df.mapreduce.Builder.build(Builder.java:338) 
    at 
org.apache.mahout.df.mapreduce.BuildForest.buildForest(BuildForest.java:195) 

parece que DataInputStream.readFully es chok ed por este archivo.

Sugeriría establecer mapreduce.fileoutputcommitter.marksuccessfuljobs en falso y reintentar su trabajo, debería funcionar.

+0

No creo que ese fuera el problema. ¿Por qué se debería leer el archivo _SUCCESS durante la fusión en memoria? Ni siquiera debería haber sido creado aún. Sin embargo, parece que esto es específico de Cloudera. Hemos creado un nuevo clúster Hadoop estándar y ya no recibo este error. – ajduff574

+0

Además, este trabajo no se ejecuta en la salida de otro trabajo, por lo que no hay ningún archivo _SUCCESS en la entrada. – ajduff574

+0

¿Trataste de configurar el parámetro? – viksit

Cuestiones relacionadas