6

Acabo de ver Batch data processing with App Engine session of Google I/O 2010, leo algunas partes de MapReduce article from Google Research y ahora estoy pensando en utilizar MapReduce on Google App Engine para implementar un sistema de recomendación en Python.MapReduce en más de un tipo de almacén de datos en Google App Engine

Prefiero usar appengine-mapreduce en lugar de Task Queue API porque el primero ofrece iteración fácil en todas las instancias de algún tipo, lotes automáticos, encadenamiento de tareas automático, etc. El problema es: mi sistema de recomendación necesita calcular la correlación entre instancias de dos modelos diferentes, es decir, instancias de dos tipos distintos.

Ejemplo: Tengo estos dos modelos: Usuario y Artículo. Cada uno tiene una lista de etiquetas como un atributo. A continuación se muestran las funciones para calcular la correlación entre usuarios y elementos. Tenga en cuenta que calculateCorrelation deberían ser llamados para cada combinación de los usuarios y los artículos:

def calculateCorrelation(user, item): 
    return calculateCorrelationAverage(u.tags, i.tags) 

def calculateCorrelationAverage(tags1, tags2): 
    correlationSum = 0.0 
    for (tag1, tag2) in allCombinations(tags1, tags2): 
     correlationSum += correlation(tag1, tag2) 
    return correlationSum/(len(tags1) + len(tags2)) 

def allCombinations(list1, list2): 
    combinations = [] 
    for x in list1: 
     for y in list2: 
      combinations.append((x, y)) 
    return combinations    

Pero eso no es un calculateCorrelation Mapper válido en appengine-MapReduce y tal vez esta función no es compatible incluso con el concepto de computación MapReduce. Sin embargo, necesito estar seguro ... sería realmente genial para mí tener esas ventajas de reducción de correspondencia apilada como el procesamiento automático de lotes y el encadenamiento de tareas.

¿Hay alguna solución para eso?

¿Debo definir mi propio InputReader? Un nuevo InputReader que lee todas las instancias de dos tipos diferentes es compatible con la implementación actual de appengine-mapreduce?

¿O debería intentar lo siguiente?

  • Combinar todas las llaves de todas las entidades de estas dos clases, de dos en dos, a instancias de un modelo nuevo (posiblemente utilizando MapReduce)
  • Iterar el uso de creadores de mapas sobre las instancias de este nuevo modelo
  • Para cada Por ejemplo, use claves dentro de él para obtener las dos entidades de diferentes tipos y calcule la correlación entre ellas.
+0

¿Cuáles son los criterios para aprobó en Usuarios y equipos? ¿Es cada combinación de usuario y artículo? ¿Solo los que están relacionados de alguna manera? Además, ¿qué idioma es ese? ¡No es (bastante) Python! –

+0

'calculateCorrelation' debe llamarse para cada combinación de usuario y artículo. Y ahora eliminé los tipos de variables para evitar confusiones. – fjsj

Respuesta

3

Siguiendo la sugerencia de Nick Johnson, escribí mi propio InputReader. Este lector busca entidades de dos tipos diferentes. Da tuplas con todas las combinaciones de estas entidades. Aquí está:

class TwoKindsInputReader(InputReader): 
    _APP_PARAM = "_app" 
    _KIND1_PARAM = "kind1" 
    _KIND2_PARAM = "kind2" 
    MAPPER_PARAMS = "mapper_params" 

    def __init__(self, reader1, reader2): 
     self._reader1 = reader1 
     self._reader2 = reader2 

    def __iter__(self): 
     for u in self._reader1: 
      for e in self._reader2: 
       yield (u, e) 

    @classmethod 
    def from_json(cls, input_shard_state): 
     reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM]) 
     reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM]) 

     return cls(reader1, reader2) 

    def to_json(self): 
     json_dict = {} 
     json_dict[self._KIND1_PARAM] = self._reader1.to_json() 
     json_dict[self._KIND2_PARAM] = self._reader2.to_json() 
     return json_dict 

    @classmethod 
    def split_input(cls, mapper_spec): 
     params = mapper_spec.params 
     app = params.get(cls._APP_PARAM) 
     kind1 = params.get(cls._KIND1_PARAM) 
     kind2 = params.get(cls._KIND2_PARAM) 
     shard_count = mapper_spec.shard_count 
     shard_count_sqrt = int(math.sqrt(shard_count)) 

     splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt) 
     splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt) 
     inputs = [] 

     for u in splitted1: 
      for e in splitted2: 
       inputs.append(TwoKindsInputReader(u, e)) 

     #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py) 
     return inputs 

    @classmethod 
    def validate(cls, mapper_spec): 
     return True #TODO 

Este código se debe utilizar cuando se necesita para procesar todas las combinaciones de entidades de dos tipos. También puede generalizar esto para más de dos tipos.

Aquí se trata de una válida la mapreduce.yaml para TwoKindsInputReader:

mapreduce: 
- name: recommendationMapReduce 
    mapper: 
    input_reader: customInputReaders.TwoKindsInputReader 
    handler: recommendation.calculateCorrelationHandler 
    params: 
    - name: kind1 
     default: kinds.User 
    - name: kind2 
     default: kinds.Item 
    - name: shard_count 
     default: 16 
2

Es difícil saber qué recomendar sin más detalles de lo que en realidad está calculando. Una opción simple es simplemente recuperar la entidad relacionada dentro de la llamada al mapa; no hay nada que le impida realizar operaciones de almacenamiento de datos allí.

Sin embargo, esto dará lugar a muchas llamadas pequeñas. Escribir un lector de entrada personalizado, como usted sugiere, le permitirá buscar ambos conjuntos de entidades en paralelo, lo que mejorará significativamente el rendimiento.

Si proporciona más detalles sobre cómo debe unirse a estas entidades, es posible que podamos brindarle sugerencias más concretas.

+0

¡Acabo de añadir más información! – fjsj

Cuestiones relacionadas