2011-09-21 7 views
9

Tengo que ejecutar tareas en aproximadamente 150k objetos Django. ¿Cuál es la mejor manera de hacer esto? Estoy usando el Django ORM como Broker. El back-end de la base de datos es MySQL y se cuelga y muere durante el task.delay() de todas las tareas. Relacionado, también quería comenzar con el envío de un formulario, pero la solicitud resultante produjo muy con un tiempo de respuesta largo que expiró.django/apio: ¿Mejores prácticas para ejecutar tareas en objetos Django de 150k?

+1

Esto es ** exactamente ** la pregunta que necesitaba. Muy apreciado. – mlissner

Respuesta

10

Me gustaría también considerar el uso de algo que no sea el uso de la base de datos como el "corredor". Realmente no es adecuado para este tipo de trabajo.

embargo, usted puede mover algunos de estos gastos indirectos del ciclo de petición/respuesta con el lanzamiento de una tarea de crear las otras tareas:

from celery.task import TaskSet, task 

from myapp.models import MyModel 

@task 
def process_object(pk): 
    obj = MyModel.objects.get(pk) 
    # do something with obj 

@task 
def process_lots_of_items(ids_to_process): 
    return TaskSet(process_object.subtask((id,)) 
         for id in ids_to_process).apply_async() 

Además, dado que es probable que no tenga 15000 transformadores de transformar todos estos objetos en paralelo, se puede dividir los objetos en trozos de decir de 100 o 1000:

from itertools import islice 
from celery.task import TaskSet, task 
from myapp.models import MyModel 

def chunks(it, n): 
    for first in it: 
     yield [first] + list(islice(it, n - 1)) 

@task 
def process_chunk(pks): 
    objs = MyModel.objects.filter(pk__in=pks) 
    for obj in objs: 
     # do something with obj 

@task 
def process_lots_of_items(ids_to_process): 
    return TaskSet(process_chunk.subtask((chunk,)) 
         for chunk in chunks(iter(ids_to_process), 
              1000)).apply_async() 
+0

¡Gracias, pregunta! Con @ApPeL y su sugerencia, pasé al broker de RabbitMQ y obtuve ganancias inmediatas. ¡Además, incorporar estas tareas estableció lo que estaba tratando de lograr! –

+0

¿Puedes comentar cómo funciona este código y cuántas tareas está generando? Lo he intentado durante los últimos 4 días y sigue bajando mi base de datos. Solo estoy utilizando un 80% de CPU cuando se está ejecutando, pero obteniendo "MySQL se ha ido" y conectando los errores después de que se ejecuta por un tiempo. –

+0

¿Parece que estás sobrecargando tu base de datos? ¿Almacena los resultados en la base de datos? Este es el valor predeterminado para django-apio. Si no los necesita, debe establecer '@task (ignore_result = True)' o deshabilitarlos globalmente con 'CELERY_IGNORE_RESULT = True' – asksol

1

utilizo beanstalkd (http://kr.github.com/beanstalkd/) como el motor. Agregar un trabajador y una tarea es bastante sencillo para Django si usa django-beanstalkd: https://github.com/jonasvp/django-beanstalkd/

Es muy confiable para mi uso.

Ejemplo del trabajador:

import os 
import time 

from django_beanstalkd import beanstalk_job 


@beanstalk_job 
def background_counting(arg): 
    """ 
    Do some incredibly useful counting to the value of arg 
    """ 
    value = int(arg) 
    pid = os.getpid() 
    print "[%s] Counting from 1 to %d." % (pid, value) 
    for i in range(1, value+1): 
     print '[%s] %d' % (pid, i) 
     time.sleep(1) 

Para iniciar un trabajo/trabajador/tarea:

from django_beanstalkd import BeanstalkClient 
client = BeanstalkClient() 

client.call('beanstalk_example.background_counting', '5') 

(fuente extraída de ejemplo de aplicación django-beanstalkd)

Enjoy!

Cuestiones relacionadas