2011-05-19 13 views
9

Estoy algo confundido con el estado actual del soporte mapreduce en GAE. De acuerdo con los documentos http://code.google.com/p/appengine-mapreduce/, la fase de reducción aún no se admite, pero en la descripción de la sesión de E/S 2011 (http://www.youtube.com/watch?v=EIxelKcyCC0) está escrito "Ahora es posible ejecutar trabajos completos de reducción de mapas en App Engine". Me pregunto si puedo usar mapreduce en esta tarea:Ejemplo de contador simple usando mapreduce en Google App Engine

Lo que quiero hacer:

Tengo modelo de coche con campo color:

class Car(db.Model): 
    color = db.StringProperty() 

Quiero correr MapReduce (de de vez en cuando, cron-defined) que puede calcular cuántos automóviles hay en cada color y almacenar este resultado en el almacén de datos. Parece un trabajo bien adaptado para mapreduce (pero si me equivoco, corrígeme), el "mapa" de fase producirá pares (, 1) para cada entidad del coche, y la fase "reducir" debería fusionar estos datos por el color_name que me da los resultados esperados . Resultado final quiero llegar son entidades con los datos calculados almacenados en el almacén de datos, algo así:

class CarsByColor(db.Model): 
    color_name = db.StringProperty() 
    cars_num = db.IntegerProperty() 

Problema: No sé cómo implementar esto en appengine ... El vídeo muestra ejemplos con un mapa definido y funciones de reducción, pero parecen ser ejemplos muy generales no relacionados con el almacén de datos. Todos los demás ejemplos que encontré están usando una función para procesar los datos de DatastoreInputReader, pero parecen ser solo la fase de "mapa", no hay ningún ejemplo de cómo hacer el "reducir" (y cómo almacenar reducir los resultados en el Almacén de datos).

Respuesta

6

Proporciono aquí la solución que encontré eventualmente usando mapreduce de GAE (sin fase de reducción). Si hubiera empezado desde el principio, probablemente habría utilizado la solución proporcionada por Drew Sears.

funciona en GAE Python 1.5.0

En aplicación.yaml he añadido el controlador para mapreduce:

- url: /mapreduce(/.*)? 
    script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py 

y el manejador para mi código para mapreduce (estoy usando url/mapred_update para recoger los resultados producidos por mapreduce):

- url: /mapred_.* 
    script: mapred.py 

Created mapreduce.yaml para las entidades de procesamiento de coches:

mapreduce: 
- name: Color_Counter 
    params: 
    - name: done_callback 
    value: /mapred_update 
    mapper: 
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader 
    handler: mapred.process 
    params: 
    - name: entity_kind 
     default: models.Car 

Explicación: done_callback es una url que se llama después de que mapreduce finalice sus operaciones. mapred.process es una función que procesa entidades individuales y contadores de actualización (está definido en el archivo mapred.py). Modelo coche se define en models.py

mapred.py:

from models import CarsByColor 
from google.appengine.ext import db 
from google.appengine.ext.mapreduce import operation as op 
from google.appengine.ext.mapreduce.model import MapreduceState 

from google.appengine.ext import webapp 
from google.appengine.ext.webapp.util import run_wsgi_app 

def process(entity): 
    """Process individual Car""" 
    color = entity.color 
    if color: 
     yield op.counters.Increment('car_color_%s' % color) 

class UpdateCounters(webapp.RequestHandler): 
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters""" 
    def post(self): 
     """Called after mapreduce operation are finished""" 
     # Finished mapreduce job id is passed in request headers 
     job_id = self.request.headers['Mapreduce-Id'] 
     state = MapreduceState.get_by_job_id(job_id) 
     to_put = [] 
     counters = state.counters_map.counters 
     # Remove counter not needed for stats 
     del counters['mapper_calls'] 
     for counter in counters.keys(): 
      stat = CarsByColor.get_by_key_name(counter) 
      if not stat: 
       stat = CarsByColor(key_name=counter, 
           name=counter) 
      stat.value = counters[counter] 
      to_put.append(stat) 
     db.put(to_put) 

     self.response.headers['Content-Type'] = 'text/plain' 
     self.response.out.write('Updated.') 


application = webapp.WSGIApplication(
            [('/mapred_update', UpdateCounters)], 
            debug=True) 
def main(): 
    run_wsgi_app(application) 

if __name__ == "__main__": 
    main()    

No es ligeramente cambiada definición de modelo de CarsByColor en comparación a la pregunta.

Puede iniciar manualmente el trabajo de mapreduce desde url: http://yourapp/mapreduce/ y con suerte desde cron (aún no he probado el cron).

9

Realmente no necesita una fase de reducción. Esto se puede hacer con una cadena tarea lineal, más o menos como sigue:

def count_colors(limit=100, totals={}, cursor=None): 
    query = Car.all() 
    if cursor: 
    query.with_cursor(cursor) 
    cars = query.fetch(limit) 
    for car in cars: 
    try: 
     totals[car.color] += 1 
    except KeyError: 
     totals[car.color] = 1 
    if len(cars) == limit: 
    cursor = query.cursor() 
    return deferred.defer(count_colors, limit, totals, cursor) 
    entities = [] 
    for color in totals: 
    entity = CarsByColor(key_name=color) 
    entity.cars_num = totals[color] 
    entities.append(entity) 
    db.put(entities) 

deferred.defer(count_colors) 

Esto debe iterar sobre todos sus coches, pasar un cursor de consulta y una cuenta corriente a una serie de tareas ad-hoc, y almacenar los totales al final.

Una fase de reducción podría tener sentido si tuviera que fusionar datos de múltiples almacenes de datos, modelos múltiples o índices múltiples en un solo modelo. Como es que no creo que te compre nada.

Otra opción: use la cola de tareas para mantener los contadores activos para cada color. Cuando creas un auto, inicia una tarea para incrementar el total de ese color. Cuando actualice un automóvil, inicie una tarea para disminuir el color anterior y otro para incrementar el nuevo color. Actualice los contadores de forma transaccional para evitar condiciones de carrera.