2010-02-25 6 views
15

Tengo un script de Python ejecutando Django para la base de datos y Memcache, pero está ejecutando notablemente como daemon independiente (es decir, no responde a las solicitudes del servidor web). El daemon comprueba una solicitud de modelo Django para objetos con un status=STATUS_NEW, luego los marca STATUS_WORKING y los coloca en una cola.Python/Django sondeo de base de datos tiene pérdida de memoria

Varios procesos (creados con el paquete de multiprocesos) sacarán cosas de la cola y trabajarán en la solicitud con el pr.id que se pasó a la cola. Creo que la pérdida de memoria es, probablemente, en el siguiente código (pero podría ser en el código 'trabajador' en el otro lado de la cola, aunque esto es poco probable, porque debido a que el tamaño de la memoria está creciendo incluso cuando no hay Solicitudes están subiendo - es decir, cuando todos los trabajadores están bloqueando en Queue.get()).

from requisitions.models import Requisition # our Django model 
from multiprocessing import Queue 

while True: 
    # Wait for "N"ew requisitions, then pop them into the queue. 
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW): 
     pr.set_status(pr.STATUS_WORKING) 
     pr.save() 
     queue.put(pr.id) 

    time.sleep(settings.DAEMON_POLL_WAIT) 

Donde settings.DAEMON_POLL_WAIT=0.01.

Parece que si salgo de esta corriente por un período de tiempo (es decir, un par de días) el proceso de Python crecerá a tamaño infinito y, finalmente, el sistema se quede sin memoria.

lo que está pasando aquí (o cómo puedo averiguar), y lo más importante - ¿cómo se puede ejecutar un demonio que hace esto?

Mi primer pensamiento es para cambiar la dinámica de la función, en particular, poniendo el cheque de nueva Solicitud objetos en un django.core.cache cache, es decir

from django.core.cache import cache 

while True: 
    time.sleep(settings.DAEMON_POLL_WAIT) 
    if cache.get('new_requisitions'): 
     # Possible race condition 
     cache.clear() 
     process_new_requisitions(queue) 

def process_new_requisitions(queue): 
    for pr in Requisition.objects.all().filter(status=Requisition.STATUS_NEW): 
     pr.set_status(pr.STATUS_WORKING) 
     pr.save() 
     queue.put(pr.id) 

El proceso que está creando Solicitudes con status=STATUS_NEW puede hacer un cache.set('new_requisitions', 1) (o como alternativa, podríamos capturar una señal o evento Requisition.save() donde se está creando una nueva solicitud y luego establecer la bandera en la memoria caché desde allí).

Sin embargo, no estoy seguro de que la solución que he propuesto aborde los problemas de memoria (que probablemente estén relacionados con la recolección de basura, por lo que el alcance por medio del process_new_requisitions puede resolver el problema).

estoy agradecido por cualquier pensamientos y comentarios.

+5

Es sólo una idea ... ¿Usted está funcionando con debug = true en settings.py? El modo de depuración guarda todas las consultas, lo que sin duda puede parecer una pérdida de memoria :) –

+0

Heh heh. ¡Lo fui, de hecho! Me olvide de eso. Lo he apagado (y me he movido a la solución de caché) y la pérdida de memoria parece haber disminuido. –

Respuesta

35

tendrá que restablecer periódicamente una lista de las consultas que Django mantiene para fines de depuración. Normalmente se borra después de cada petición, pero dado que su aplicación no está solicitud basada, tiene que hacerlo de forma manual:

from django import db 

db.reset_queries() 

Ver también:

  • "Debugging Django memory leak with TrackRefs and Guppy" por Mikko Ohtamaa:

    Django realiza un seguimiento de todas las consultas para depuración propósitos (connection.queries). Esta lista es reseteada al final de la solicitud HTTP. Pero en el modo independiente, no hay solicitudes .Así que hay que manualmente reajuste a las consultas de la lista después de cada ciclo trabajo

  • "Why is Django leaking memory?" in Django FAQ - se habla tanto sobre la configuración DEBUG a False, que es siempre importante, y sobre la eliminación de la lista de consultas utilizando db.reset_queries(), importante en aplicaciones como la suya.

+0

Esas son referencias sólidas: Gracias. –

+0

Parece que el enlace se ha movido a: http://opensourcehacker.com/2008/03/07/debugging-django-memory-leak-with-trackrefs-and-guppy/ – joctee

+0

@joctee Gracias, he actualizado la publicación. –

5

¿El archivo settings.py para el proceso de demonio tiene DEBUG = True? Si es así, Django guarda en la memoria un registro de todos los SQL que ha ejecutado hasta el momento, lo que puede provocar una pérdida de memoria.

+0

¡Gran sugerencia! Fue habilitado; Lo he deshabilitado También cambié a la verificación de caché, por lo que no está sondeando la base de datos incesantemente. Veremos si estas dos cosas ayudan. ¡Gracias! –

1

Aparte de db.reset_queries() y depurar = trucos falsos, aquí es otro enfoque: Sólo desovar otro proceso que realiza la consulta Django y alimenta la cola. Este proceso funcionará en su propio contexto de memoria, y después de realizar su tarea liberará la memoria.

Creo que a veces (si no siempre) es inevitable controlar los problemas de memoria con un proceso de larga ejecución que realiza transacciones pesadas de django.

2

Tuve una gran cantidad de datos para hacer, por lo que mi solución a este problema era usar multiprocesamiento y usar pools para contrarrestar cualquier inflamación que sucediera.

Para simplificar, acabo de definir algunas funciones "globales" (nivel superior, cualquiera que sea el término en Python) en lugar de tratar de hacer las cosas más fáciles de recortar.

Aquí es en forma de resumen:

import multiprocessing as mp 

WORKERS = 16 # I had 7 cores, allocated 16 because processing was I/O bound 

# this is a global function 
def worker(params): 
    # do stuff 
    return something_for_the_callback_to_analyze 

# this is a global function 
def worker_callback(worker_return_value): 
    # report stuff, or pass 

# My multiprocess_launch was inside of a class 
def multiprocess_launcher(params): 
    # somehow define a collection 
    while True: 
    if len(collection) == 0: 
     break 
    # Take a slice 
    pool_sub_batch = [] 
    for _ in range(WORKERS): 
     if collection: # as long as there's still something in the collection 
     pool_sub_batch.append(collection.pop()) 
    # Start a pool, limited to the slice 
    pool_size = WORKERS 
    if len(pool_sub_batch) < WORKERS: 
     pool_size = len(pool_sub_batch) 
    pool = mp.Pool(processes=pool_size) 
    for sub_batch in pool_sub_batch: 
     pool.apply_async(worker, args = (sub_batch), callback = worker_callback) 
    pool.close() 
    pool.join() 
    # Loop, more slices