2011-07-05 13 views
6

Me gustaría tener tareas de apio que dependen del resultado de 2 o más tareas. He examinado Python+Celery: Chaining jobs? y http://pypi.python.org/pypi/celery-tasktree, pero solo son válidos si las tareas solo tienen una tarea dependiente.Ejecución de tareas de apio con el gráfico de dependencia

Conozco TaskSet, pero no parece haber una manera de ejecutar instantáneamente una devolución de llamada cuando TaskSetResult.ready() pasa a ser True. Lo que tengo en mente ahora es tener una tarea periódica que sondee TaskSetResult.ready() cada pocos [milli] segundos aproximadamente y active la devolución de llamada a medida que devuelve True, pero eso me suena bastante poco elegante.

¿Alguna sugerencia?

Respuesta

2

mrbox es cierto, puedes volver a intentarlo hasta que los resultados estén listos, pero no está tan claro en los documentos que cuando reintentas tienes que pasar los elementos setid y subtareas, y para recuperarlo debes usar el mapa función, a continuación hay un código de muestra para explicar lo que quiero decir.

def run(self, setid=None, subtasks=None, **kwargs): 

    if not setid or not subtasks: 
     #Is the first time that I launch this task, I'm going to launch the subtasks 
     … 
     tasks = [] 
     for slice in slices: 
      tasks.append(uploadTrackSlice.subtask((slice,folder_name))) 

     job = TaskSet(tasks=tasks) 
     task_set_result = job.apply_async() 
     setid = task_set_result.taskset_id 
     subtasks = [result.task_id for result in task_set_result.subtasks] 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 

    #Is a retry than we just have to check the results   
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks)) 
    if not tasks_result.ready(): 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 
    else:  
     if tasks_result.successful(): 
      return tasks_result.join() 
     else: 
      raise Exception("Some of the tasks was failing") 
2

mi humilde opinión, que puede hacer algo similar a lo hecho en docs- método de reintento link

O puede utilizar con MAX_RETRIES = Ninguno - si una de las tareas de la 'base' ready() es falsa, puede dispara el método .retry() hasta que se completen las dos tareas 'base'.

7

En las versiones recientes de apio (3.0 o superior) se puede utilizar una llamada de acordes para lograr el efecto deseado:

De http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives:

acorde simple

La primitiva de acordes nos permite agregar una devolución de llamada para que se llame cuando todas las de las tareas de un grupo hayan terminado de ejecutarse, lo que a menudo es requerido para los algoritmos th por lo que no son vergonzosamente paralelas:

>>> from celery import chord 
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() 
>>> res.get() 
90 

de responsabilidad: No he probado esto por mí mismo todavía.

Cuestiones relacionadas