7

todo funciona bien a nivel local cuando lo haga de la siguiente manera:Broken error Pipe causa de streaming trabajo Elastic MapReduce en AWS a fallar

cat input | python mapper.py | sort | python reducer.py 

sin embargo, cuando ejecuto el trabajo de transmisión de MapReduce en AWS Elastic MapReduce, el trabajo no lo hace completar con éxito El mapper.py se ejecuta en parte (lo sé porque escribí en stderr en el camino). El asignador es interrumpida por un error de "Tubería rota", que soy capaz de recuperar de registro del sistema del intento de tareas después de que falla:

java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done 
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation 
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.io.IOException: log:null 
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s] 
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null 
HOST=null 
USER=hadoop 
HADOOP_USER=null 
last Hadoop input: |null| 
last tool output: |text/html 1| 
Date: Mon Mar 26 07:19:05 UTC 2012 
java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task 
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written 

Aquí es mapper.py. Tenga en cuenta que escribo en stderr para proporcionar a mí mismo con la depuración de información:

#!/usr/bin/env python 

import sys 
from warc import ARCFile 

def main(): 
    warc_file = ARCFile(fileobj=sys.stdin) 
    for web_page in warc_file: 
     print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging 
     print '%s\t%s' % (web_page.header.content_type, 1) 
    print >> sys.stderr, 'done' #For debugging 
if __name__ == "__main__": 
    main() 

Aquí es lo que me pasa en el stderr para el intento de tareas cuando la mapper.py es ejecutado:

text/html 1 
text/html 1 
text/html 1 

Básicamente, el el bucle se ejecuta 3 veces y luego se detiene abruptamente sin que pitón arroje ningún error. (Nota: es debería dando salida a miles de líneas). Incluso una excepción no detectada debería aparecer en stderr.

Como MapReduce funciona completamente bien en mi computadora local, creo que esto es un problema con la forma en que Hadoop trata con la salida que estoy imprimiendo desde mapper.py. Pero no tengo idea de cuál podría ser el problema.

Respuesta

9

Su proceso de transmisión (su secuencia de comandos de Python) está terminando prematuramente. Se le puede hacer pensar que la información está completa (por ejemplo, interpretar un EOF) o una excepción tragada. De cualquier forma, Hadoop está tratando de ingresar vía STDIN a su script, pero como la aplicación ha finalizado (y, por lo tanto, STDIN ya no es un descriptor de archivo válido), recibirá un error de BrokenPipe. Sugeriría agregar rastros de stderr en su script para ver qué línea de entrada está causando el problema. codificación feliz,

-Geoff

+4

babonk, puede ¿proporciona detalles sobre cómo resolvió su problema utilizando este consejo? –

+0

Lo mismo. Aparentemente tengo un error similar aquí: http: // stackoverflow.com/questions/18556270/aws-elástico-mapreduce-doesnt-seem-to-be-incorrect-converting-the-streaming-to-j, y dado que funciona cuando se canaliza, no sé cómo " arreglarlo para la transmisión. – Mittenchops

1

no tengo ninguna experiencia con Hadoop en AWS pero tenía el mismo error en un clúster Hadoop regulares - y en mi caso el problema fue como empecé pitón -mapper ./mapper.py -reducer ./reducer.py funcionaba pero -mapper python mapper.py didn' t.

Parece que también usa un paquete de python no estándar warc ¿envía los archivos necesarios al streamjob? -cacheFiles o -cacheArchive podría ser útil.

+0

¿Cómo se incluyen paquetes de python no estándar? El mapreduce elástico de AWS, en particular, no parece ofrecer opciones como los archivos caché. – Mittenchops

6

Esto se dice más arriba, pero permítanme intentar aclararlo: ¡debe bloquear en stdin, incluso si no lo necesita! Esto es no lo mismo que las tuberías de Linux, así que no dejes que te engañe. Lo que ocurre, intuitivamente, es que Streaming levanta su ejecutable y luego dice: "espere aquí mientras recibo comentarios para usted". Si tu ejecutable se detiene por algún motivo antes de que Streaming te envíe el 100% de la entrada, Streaming dice: "Oye, ¿a dónde fue ese ejecutable que me puse de pie? ... Hmmmm ... el tubo está roto, déjame plantear esa excepción ! " Por lo tanto, aquí hay un código de pitón, lo único que hace es lo que hace el gato, pero tendrá en cuenta que sólo se procesa no salir hasta que toda la entrada de este código, y que es el punto clave:

#!/usr/bin/python 
import sys 

while True: 
    s = sys.stdin.readline() 
    if not s: 
     break 
    sys.stdout.write(s) 
+1

Estaba recibiendo este error porque no estaba haciendo nada con la entrada. Agregué este código (aunque no hace nada por mí) y el error fue. – schoon

Cuestiones relacionadas