2012-02-12 10 views
8

Estoy trabajando en la función appengine-mapreduce y he modificado la demostración para que se ajuste a mi propósito. Básicamente tengo un millón de líneas en el siguiente formato: ID de usuario, hora1, hora2. Mi propósito es encontrar la diferencia entre time1 y time2 para cada userid.Memoria límite golpeado con appengine-mapreduce

Sin embargo, mientras corro esto en Google App Engine, me encontré con este mensaje de error en la sección de registros:

superado el límite de memoria privada suave con 180.56 MB después de dar servicio 130 pedidos Todos los bien el manejo de esta solicitud, el Se encontró que el proceso que manejó esta solicitud usaba demasiada memoria y se terminó. Es probable que esto provoque que se use un nuevo proceso para la siguiente solicitud a su aplicación. Si ve este mensaje con frecuencia, puede tener una pérdida de memoria en su aplicación.

def time_count_map(data): 
    """Time count map function.""" 
    (entry, text_fn) = data 
    text = text_fn() 

    try: 
    q = text.split('\n') 
    for m in q: 
     reader = csv.reader([m.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
    logging.debug(e) 


def time_count_reduce(key, values): 
    """Time count reduce function.""" 
    time = 0.0 
    for subtime in values: 
    time += float(subtime) 
    realtime = int(time) 
    yield "%s: %d\n" % (key, realtime) 

¿Puede alguien sugerir qué otra manera puedo optimizar mi código mejor? ¡¡Gracias!!

Editado:

Aquí está el controlador de canalización:

class TimeCountPipeline(base_handler.PipelineBase): 
    """A pipeline to run Time count demo. 

    Args: 
    blobkey: blobkey to process as string. Should be a zip archive with 
     text files inside. 
    """ 

    def run(self, filekey, blobkey): 
    logging.debug("filename is %s" % filekey) 
    output = yield mapreduce_pipeline.MapreducePipeline(
     "time_count", 
     "main.time_count_map", 
     "main.time_count_reduce", 
     "mapreduce.input_readers.BlobstoreZipInputReader", 
     "mapreduce.output_writers.BlobstoreOutputWriter", 
     mapper_params={ 
      "blob_key": blobkey, 
     }, 
     reducer_params={ 
      "mime_type": "text/plain", 
     }, 
     shards=32) 
    yield StoreOutput("TimeCount", filekey, output) 

Mapreduce.yaml:

mapreduce: 
- name: Make messages lowercase 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.lower_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 
- name: Make messages upper case 
    params: 
    - name: done_callback 
    value: /done 
    mapper: 
    handler: main.upper_case_posts 
    input_reader: mapreduce.input_readers.DatastoreInputReader 
    params: 
    - name: entity_kind 
     default: main.Post 
    - name: processing_rate 
     default: 100 
    - name: shard_count 
     default: 4 

El resto de los archivos son exactamente los mismos que la demo.

He subido una copia de mis códigos en Dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

+0

¿Puedes mostrar la configuración de mapreduce? Por alguna razón, parece que está pasando todo el archivo al asignador, en lugar de mapearlo línea por línea. –

+0

Hola Daniel, mi pregunta ha sido editada. Gracias, realmente lo aprecio! – autumngard

Respuesta

2

es probable que su archivo de entrada excede el límite de memoria blanda de tamaño. Para archivos grandes, use BlobstoreLineInputReader o BlobstoreZipLineInputReader.

Estos lectores de entrada pasan algo diferente a la función map, pasan el start_position en el archivo y la línea de texto.

Su map función podría ser algo como:

def time_count_map(data): 
    """Time count map function.""" 
    text = data[1] 

    try: 
     reader = csv.reader([text.replace('\0', '')], skipinitialspace=True) 
     for s in reader: 
      """Calculate time elapsed""" 
      sdw = s[1] 
      start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p") 
      edw = s[2] 
      end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p") 
      time_difference = time.mktime(end_date) - time.mktime(start_date) 
      yield (s[0], time_difference) 
    except IndexError, e: 
     logging.debug(e) 

Usando BlobstoreLineInputReader permitirá que se ejecute la tarea mucho más rápido, ya que puede utilizar más de un casco, hasta 256, pero significa que usted tiene que subir su archivos sin comprimir, lo que puede ser un dolor. Lo manejo cargando los archivos comprimidos en un servidor de Windows EC2, luego descomprimiendo y cargando desde allí, ya que el ancho de banda ascendente es tan grande.

+0

¡Esto funcionó muy bien para mí! ¡Muchas gracias! :) – autumngard

6

También considere llamar a gc.collect() en puntos regulares durante su código. He visto varias preguntas acerca de cómo exceder los límites de memoria suave que se aliviaban al llamar a gc.collect(), la mayoría de las cuales tienen que ver con blobstore.

+0

está llamando a gc.collect() solo se aplica a blobstore o en general? – marcadian

Cuestiones relacionadas