Necesito ejecutar un trabajo mapreduce que sea dinámico en el sentido de que los parámetros se deben pasar al mapa y reducir funciones cada vez que se ejecuta el trabajo mapreduce (por ejemplo, en respuesta a una solicitud del usuario).¿Cómo pasar los parámetros dinámicamente a la función de mapa en GAE mapreduce?
¿Cómo puedo lograr esto? No pude ver en ninguna parte de la documentación cómo hacer un procesamiento dinámico en tiempo de ejecución para el mapa y reducir.
class MatchProcessing(webapp2.RequestHandler):
def get(self):
requestKeyID=int(self.request.get('riderbeeRequestID'))
userKey=self.request.get('userKey')
pipeline = MatchingPipeline(requestKeyID, userKey)
pipeline.start()
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class MatchingPipeline(base_handler.PipelineBase):
def run(self, requestKeyID, userKey):
yield mapreduce_pipeline.MapreducePipeline(
"riderbee_matching",
"tasks.matchingMR.riderbee_map",
"tasks.matchingMR.riderbee_reduce",
"mapreduce.input_readers.DatastoreInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"entity_kind": "models.rides.RiderbeeRequest",
"requestKeyID": requestKeyID,
"userKey": userKey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)
def riderbee_map(riderbeeRequest):
# would like to access the requestKeyID and userKey parameters that were passed in mapper_params
# so that we can do some processing based on that
yield (riderbeeRequest.user.email, riderbeeRequest.key().id())
def riderbee_reduce(key, values):
# would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params
# so that we can do some processing based on that
yield "%s: %s\n" % (key, len(values))
Ayuda por favor?
FYI ... aquí es cómo enviar datos a un puesto de trabajo en Java - http: // www. thecloudavenue.com/2011/11/passing-parameters-to-mappers-and.html –
Hmmm. El enlace que le das puntos a Hadoop. Esto es para GAE MapReduce ... –