2011-06-14 10 views
40

Estoy usando apio-acebo para iniciar una tarea principal que inicia una serie de tareas secundarias. Ya tengo ambas tareas escritas.Tarea de apio que ejecuta más tareas

¿Hay alguna manera de hacer esto fácilmente? ¿El apio permite que las tareas se ejecuten desde dentro de las tareas?

Mi ejemplo:

@task 
def compute(users=None): 
    if users is None: 
     users = User.objects.all() 

    tasks = [] 
    for user in users: 
     tasks.append(compute_for_user.subtask((user.id,))) 

    job = TaskSet(tasks) 
    job.apply_async() # raises a IOError: Socket closed 

@task 
def compute_for_user(user_id): 
    #do some stuff 

compute se llama a partir de celerybeat, pero causa una IOError cuando se trata de correr apply_async. ¿Algunas ideas?

+0

http://celeryproject.org/docs/userguide/tasksets.html – bdd

+0

puede un taskset se inició desde el interior de una tarea? –

+6

Las tareas y los conjuntos de tareas se pueden aplicar desde dentro de una tarea, pero nunca debe esperar por sus resultados (vea http://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks) – asksol

Respuesta

23

Para responder a sus preguntas de apertura: a partir de la versión 2.0, Celery proporciona una manera fácil de comenzar tareas desde otras tareas. Lo que está llamando "tareas secundarias" es lo que llama "subtareas". Consulte la documentación para Sets of tasks, Subtasks and Callbacks, que @Paperino tuvo la amabilidad de vincular.

Para la versión 3.0, Celery cambió a usar groups para este y otros tipos de comportamiento.

Su código muestra que ya está familiarizado con esta interfaz. Su pregunta real parece ser: "¿Por qué recibo un 'Socket Closed' IOError cuando intento ejecutar mi conjunto de subtareas?" No creo que nadie pueda responder eso, porque no ha proporcionado suficiente información sobre su programa. Su fragmento no se puede ejecutar tal como está, por lo que no podemos examinar el problema que está teniendo para nosotros. Por favor, publique stacktrace provisto con el IOError, y con un poco de suerte, alguien que pueda ayudarle con su crasher vendrá.

6

se puede usar algo como esto (Apoyo en 3,0)

g = group(compute_for_user.s(user.id) for user in users) 
g.apply_async() 
+0

Entonces, en su implementación, el método "calcular (usuarios = Ninguno):" no es necesario en absoluto, ¿sí? –

+1

Esto no se recomienda si desea esperar a que se completen las subtareas y obtener el resultado. – Siddharth

Cuestiones relacionadas