Me encuentro con un problema extraño. Cuando ejecuto mi trabajo Hadoop en un gran conjunto de datos (> 1 TB comprimido archivos de texto), varias de las tareas reducir fallan, con stacktraces como éstas:Hadoop: fusión intermedia fallida
java.io.IOException: Task: attempt_201104061411_0002_r_000044_0 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:139)
at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:103)
at org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:335)
at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:350)
at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:156)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2698)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:375)
at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:241)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:125)
... 8 more
java.io.IOException: Task: attempt_201104061411_0002_r_000056_0 - The reduce copier failed
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:385)
at org.apache.hadoop.mapred.Child$4.run(Child.java:240)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1115)
at org.apache.hadoop.mapred.Child.main(Child.java:234)
Caused by: java.io.IOException: Intermediate merge failed
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2714)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.run(ReduceTask.java:2639)
Caused by: java.lang.RuntimeException: java.io.EOFException
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:128)
at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:373)
at org.apache.hadoop.util.PriorityQueue.upHeap(PriorityQueue.java:123)
at org.apache.hadoop.util.PriorityQueue.put(PriorityQueue.java:50)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:447)
at org.apache.hadoop.mapred.Merger$MergeQueue.merge(Merger.java:381)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:107)
at org.apache.hadoop.mapred.Merger.merge(Merger.java:93)
at org.apache.hadoop.mapred.ReduceTask$ReduceCopier$InMemFSMergeThread.doInMemMerge(ReduceTask.java:2689)
... 1 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:180)
at org.apache.hadoop.io.Text.readString(Text.java:402)
at com.__.hadoop.pixel.segments.IpCookieCountFilter$IpAndIpCookieCount.readFields(IpCookieCountFilter.java:240)
at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:122)
... 9 more
No todos mis reductores fallan. Varios a menudo tienen éxito antes de ver los fracasos con los demás. Como puede ver, las stacktraces siempre parecen originarse desde IPAndIPCookieCount.readFields()
y siempre en la etapa de fusión en memoria, pero no siempre desde la misma parte de readFields
.
Este trabajo tiene éxito cuando se ejecuta en conjuntos de datos más pequeños (aproximadamente 1/30 del tamaño). Hay casi tantas salidas como entradas para el trabajo, pero cada registro de salida es más corto. Este trabajo es esencialmente una implementación de un tipo secundario.
Estamos utilizando la distribución CDH3 Hadoop.
Aquí es mi costumbre WritableComparable
aplicación:
public static class IpAndIpCookieCount implements WritableComparable<IpAndIpCookieCount> {
private String ip;
private int ipCookieCount;
public IpAndIpCookieCount() {
// empty constructor for hadoop
}
public IpAndIpCookieCount(String ip, int ipCookieCount) {
this.ip = ip;
this.ipCookieCount = ipCookieCount;
}
public String getIp() {
return ip;
}
public int getIpCookieCount() {
return ipCookieCount;
}
@Override
public void readFields(DataInput in) throws IOException {
ip = Text.readString(in);
ipCookieCount = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, ip);
out.writeInt(ipCookieCount);
}
@Override
public int compareTo(IpAndIpCookieCount other) {
int firstComparison = ip.compareTo(other.getIp());
if (firstComparison == 0) {
int otherIpCookieCount = other.getIpCookieCount();
if (ipCookieCount == otherIpCookieCount) {
return 0;
} else {
return ipCookieCount < otherIpCookieCount ? 1 : -1;
}
} else {
return firstComparison;
}
}
@Override
public boolean equals(Object o) {
if (o instanceof IpAndIpCookieCount) {
IpAndIpCookieCount other = (IpAndIpCookieCount) o;
return ip.equals(other.getIp()) && ipCookieCount == other.getIpCookieCount();
} else {
return false;
}
}
@Override
public int hashCode() {
return ip.hashCode()^ipCookieCount;
}
}
El método readFields
es muy simple, y no veo ningún problema en esta clase. Además, he visto a otras personas conseguir esencialmente la misma traza de la pila:
- http://lucene.472066.n3.nabble.com/Reduce-Copier-Failed-td2120228.html
- https://groups.google.com/a/cloudera.org/group/cdh-user/browse_thread/thread/3544da912bf66506
- http://www.listware.net/201010/hadoop-common-user/70382-merging-of-the-local-fs-files-threw-an-exception-javaioioexception-javalangruntimeexception-javaioeofexception.html
- http://mail-archives.apache.org/mod_mbox/hadoop-mapreduce-user/201101.mbox/%[email protected]%3E
- http://web.archiveorange.com/archive/v/5nvvZTgeqwCRQ3F9vEzI
Ninguno parecía realmente han descubierto el problema detrás de esto Los dos últimos parecen sugerir que esto podría ser un problema de memoria (aunque estas stacktraces no son OutOfMemoryException
s). Al igual que la segunda a la última publicación en esa lista de enlaces, he intentado establecer el número de reductores más alto (hasta 999), pero sigo teniendo fallas. No (todavía) intenté asignar más memoria para reducir tareas, ya que eso requeriría que reconfiguremos nuestro clúster.
¿Esto es un error en Hadoop? ¿O estoy haciendo algo mal?
EDIT: Mis datos se dividen por día. Si ejecuto el trabajo 7 veces, una vez por cada día, las 7 se completan. Si ejecuto un trabajo durante los 7 días, falla. El informe grande durante los 7 días verá exactamente las mismas claves que las más pequeñas (en conjunto), pero obviamente no en el mismo orden, en los mismos reductores, etc.
También encontré este problema, pero no aparece todo el tiempo. En el mismo conjunto de datos, que se ejecuta en las mismas condiciones, a veces mi trabajo falla con 'EOFException' causada por un método' readByte() 'en el método' WritableComparable.readFields (..) '. Creo que podría ser un problema de red que causa algún tipo de retraso. –