todo funciona bien a nivel local cuando lo haga de la siguiente manera:Broken error Pipe causa de streaming trabajo Elastic MapReduce en AWS a fallar
cat input | python mapper.py | sort | python reducer.py
sin embargo, cuando ejecuto el trabajo de transmisión de MapReduce en AWS Elastic MapReduce, el trabajo no lo hace completar con éxito El mapper.py
se ejecuta en parte (lo sé porque escribí en stderr
en el camino). El asignador es interrumpida por un error de "Tubería rota", que soy capaz de recuperar de registro del sistema del intento de tareas después de que falla:
java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.io.IOException: log:null
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=hadoop
HADOOP_USER=null
last Hadoop input: |null|
last tool output: |text/html 1|
Date: Mon Mar 26 07:19:05 UTC 2012
java.io.IOException: Broken pipe
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:282)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written
Aquí es mapper.py
. Tenga en cuenta que escribo en stderr para proporcionar a mí mismo con la depuración de información:
#!/usr/bin/env python
import sys
from warc import ARCFile
def main():
warc_file = ARCFile(fileobj=sys.stdin)
for web_page in warc_file:
print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging
print '%s\t%s' % (web_page.header.content_type, 1)
print >> sys.stderr, 'done' #For debugging
if __name__ == "__main__":
main()
Aquí es lo que me pasa en el stderr para el intento de tareas cuando la mapper.py es ejecutado:
text/html 1
text/html 1
text/html 1
Básicamente, el el bucle se ejecuta 3 veces y luego se detiene abruptamente sin que pitón arroje ningún error. (Nota: es debería dando salida a miles de líneas). Incluso una excepción no detectada debería aparecer en stderr.
Como MapReduce funciona completamente bien en mi computadora local, creo que esto es un problema con la forma en que Hadoop trata con la salida que estoy imprimiendo desde mapper.py. Pero no tengo idea de cuál podría ser el problema.
babonk, puede ¿proporciona detalles sobre cómo resolvió su problema utilizando este consejo? –
Lo mismo. Aparentemente tengo un error similar aquí: http: // stackoverflow.com/questions/18556270/aws-elástico-mapreduce-doesnt-seem-to-be-incorrect-converting-the-streaming-to-j, y dado que funciona cuando se canaliza, no sé cómo " arreglarlo para la transmisión. – Mittenchops