2012-06-29 5 views
7

Necesito ejecutar un trabajo mapreduce que sea dinámico en el sentido de que los parámetros se deben pasar al mapa y reducir funciones cada vez que se ejecuta el trabajo mapreduce (por ejemplo, en respuesta a una solicitud del usuario).¿Cómo pasar los parámetros dinámicamente a la función de mapa en GAE mapreduce?

¿Cómo puedo lograr esto? No pude ver en ninguna parte de la documentación cómo hacer un procesamiento dinámico en tiempo de ejecución para el mapa y reducir.

class MatchProcessing(webapp2.RequestHandler): 

    def get(self): 
     requestKeyID=int(self.request.get('riderbeeRequestID')) 
     userKey=self.request.get('userKey') 
     pipeline = MatchingPipeline(requestKeyID, userKey) 
     pipeline.start() 
     self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id) 


class MatchingPipeline(base_handler.PipelineBase): 
    def run(self, requestKeyID, userKey): 
     yield mapreduce_pipeline.MapreducePipeline(
      "riderbee_matching", 
      "tasks.matchingMR.riderbee_map", 
      "tasks.matchingMR.riderbee_reduce", 
      "mapreduce.input_readers.DatastoreInputReader", 
      "mapreduce.output_writers.BlobstoreOutputWriter", 
      mapper_params={ 
       "entity_kind": "models.rides.RiderbeeRequest", 
       "requestKeyID": requestKeyID, 
       "userKey": userKey, 
      }, 
      reducer_params={ 
       "mime_type": "text/plain", 
      }, 
      shards=16) 


def riderbee_map(riderbeeRequest): 
    # would like to access the requestKeyID and userKey parameters that were passed in mapper_params 
    # so that we can do some processing based on that 

    yield (riderbeeRequest.user.email, riderbeeRequest.key().id()) 


def riderbee_reduce(key, values): 
    # would like to access the requestKeyID and userKey parameters that were passed earlier, perhaps through reducer_params 
    # so that we can do some processing based on that 

    yield "%s: %s\n" % (key, len(values)) 

Ayuda por favor?

+0

FYI ... aquí es cómo enviar datos a un puesto de trabajo en Java - http: // www. thecloudavenue.com/2011/11/passing-parameters-to-mappers-and.html –

+0

Hmmm. El enlace que le das puntos a Hadoop. Esto es para GAE MapReduce ... –

Respuesta

5

Estoy bastante seguro de que solo puede especificar parámetros en mapper_parameters y leerlos desde el módulo de contexto. Vea http://code.google.com/p/appengine-mapreduce/wiki/UserGuidePython#Mapper_parameters para más detalles.

+0

Excepto que en el enlace que refiere a esos parámetros no son dinámicos (es decir, no se pasan de forma programática en el tiempo de ejecución) sino que se leen de mapreduce.yaml, que es estático. Es decir, a menos que no entienda cómo funcionaría esto –

+0

En el código que utiliza arriba, hay un parametro mapper_params para el constructor de MapreducePipeline; también hay un parámetro mapreduce_parameters param. Estos son el equivalente de los params que provienen del archivo yaml. Vea el código aquí: http://code.google.com/p/appengine-mapreduce/source/browse/trunk/python/src/mapreduce/control.py –

4

Esta es la forma de acceder a los parámetros del asignador de la función de mapeo, utilizando el módulo de contexto:

from mapreduce import context 

def riderbee_map(riderbeeRequest): 
    ctx = context.get() 
    params = ctx.mapreduce_spec.mapper.params 
    requestKeyID = params["requestKeyID"] 
Cuestiones relacionadas