Estoy trabajando en la función appengine-mapreduce y he modificado la demostración para que se ajuste a mi propósito. Básicamente tengo un millón de líneas en el siguiente formato: ID de usuario, hora1, hora2. Mi propósito es encontrar la diferencia entre time1 y time2 para cada userid.Memoria límite golpeado con appengine-mapreduce
Sin embargo, mientras corro esto en Google App Engine, me encontré con este mensaje de error en la sección de registros:
superado el límite de memoria privada suave con 180.56 MB después de dar servicio 130 pedidos Todos los bien el manejo de esta solicitud, el Se encontró que el proceso que manejó esta solicitud usaba demasiada memoria y se terminó. Es probable que esto provoque que se use un nuevo proceso para la siguiente solicitud a su aplicación. Si ve este mensaje con frecuencia, puede tener una pérdida de memoria en su aplicación.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
¿Puede alguien sugerir qué otra manera puedo optimizar mi código mejor? ¡¡Gracias!!
Editado:
Aquí está el controlador de canalización:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
El resto de los archivos son exactamente los mismos que la demo.
He subido una copia de mis códigos en Dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
¿Puedes mostrar la configuración de mapreduce? Por alguna razón, parece que está pasando todo el archivo al asignador, en lugar de mapearlo línea por línea. –
Hola Daniel, mi pregunta ha sido editada. Gracias, realmente lo aprecio! – autumngard