2012-07-16 10 views
13

Tengo una cadena de apio que ejecuta algunas tareas. Cada una de las tareas puede fallar y volver a intentarlo. A continuación encontrará un ejemplo rápido:Volviendo a intentar las tareas fallidas del apio que forman parte de una cadena

from celery import task 

@task(ignore_result=True) 
def add(x, y, fail=True): 
    try: 
     if fail: 
      raise Exception('Ugly exception.') 
     print '%d + %d = %d' % (x, y, x+y) 
    except Exception as e: 
     raise add.retry(args=(x, y, False), exc=e, countdown=10) 

@task(ignore_result=True) 
def mul(x, y): 
    print '%d * %d = %d' % (x, y, x*y) 

y la cadena:

from celery.canvas import chain 
chain(add.si(1, 2), mul.si(3, 4)).apply_async() 

Ejecución de las dos tareas (y suponiendo que nada falla), su conseguiría/Véase impresa:

1 + 2 = 3 
3 * 4 = 12 

Sin embargo, cuando la tarea de agregar falla la primera vez y tiene éxito en llamadas de reintentos posteriores, el resto de las tareas de la cadena no se ejecutan, es decir, la tarea de agregar falla, todas las demás tareas de la cadena no se ejecutan y después ew segundos, la tarea de agregar se ejecuta nuevamente y tiene éxito y el resto de las tareas de la cadena (en este caso mul.si (3, 4)) no se ejecuta.

¿El apio proporciona una manera de continuar las cadenas fallidas de la tarea que falló, en adelante? De lo contrario, ¿cuál sería el mejor enfoque para lograr esto y asegurarse de que las tareas de una cadena se ejecuten en el orden especificado y solo después de que la tarea anterior se haya ejecutado correctamente incluso si la tarea se reintenta varias veces?

Nota 1: El problema puede ser resuelto haciendo

add.delay(1, 2).get() 
mul.delay(3, 4).get() 

, pero estoy interesado en entender por qué las cadenas no funcionan con las tareas fallidas.

Respuesta

0

También estoy interesado en comprender por qué las cadenas no funcionan con tareas fallidas.

cavo un código apio y lo que he encontrado hasta ahora es:

La aplicación happends en app.builtins.py

@shared_task 
def add_chain_task(app): 
    from celery.canvas import chord, group, maybe_subtask 
    _app = app 

    class Chain(app.Task): 
     app = _app 
     name = 'celery.chain' 
     accept_magic_kwargs = False 

     def prepare_steps(self, args, tasks): 
      steps = deque(tasks) 
      next_step = prev_task = prev_res = None 
      tasks, results = [], [] 
      i = 0 
      while steps: 
       # First task get partial args from chain. 
       task = maybe_subtask(steps.popleft()) 
       task = task.clone() if i else task.clone(args) 
       i += 1 
       tid = task.options.get('task_id') 
       if tid is None: 
        tid = task.options['task_id'] = uuid() 
       res = task.type.AsyncResult(tid) 

       # automatically upgrade group(..) | s to chord(group, s) 
       if isinstance(task, group): 
        try: 
         next_step = steps.popleft() 
        except IndexError: 
         next_step = None 
       if next_step is not None: 
        task = chord(task, body=next_step, task_id=tid) 
       if prev_task: 
        # link previous task to this task. 
        prev_task.link(task) 
        # set the results parent attribute. 
        res.parent = prev_res 

       results.append(res) 
       tasks.append(task) 
       prev_task, prev_res = task, res 

      return tasks, results 

     def apply_async(self, args=(), kwargs={}, group_id=None, chord=None, 
       task_id=None, **options): 
      if self.app.conf.CELERY_ALWAYS_EAGER: 
       return self.apply(args, kwargs, **options) 
      options.pop('publisher', None) 
      tasks, results = self.prepare_steps(args, kwargs['tasks']) 
      result = results[-1] 
      if group_id: 
       tasks[-1].set(group_id=group_id) 
      if chord: 
       tasks[-1].set(chord=chord) 
      if task_id: 
       tasks[-1].set(task_id=task_id) 
       result = tasks[-1].type.AsyncResult(task_id) 
      tasks[0].apply_async() 
      return result 

     def apply(self, args=(), kwargs={}, **options): 
      tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']] 
      res = prev = None 
      for task in tasks: 
       res = task.apply((prev.get(),) if prev else()) 
       res.parent, prev = prev, res 
      return res 
    return Chain 

Se puede ver que al final prepare_stepsprev_task está vinculado a la siguiente tarea. Cuando fallaron las prev_task, no se llama a la siguiente tarea.

estoy probando con la adición de la link_error de la tarea anterior a la siguiente:

if prev_task: 
    # link and link_error previous task to this task. 
    prev_task.link(task) 
    prev_task.link_error(task) 
    # set the results parent attribute. 
    res.parent = prev_res 

Pero entonces, la siguiente tarea debe hacerse cargo de los dos casos (tal vez, excepto cuando está configurado para ser inmutable, por ejemplo, no aceptar más argumentos).

Creo que la cadena puede soportar que al permitir una sintaxis le gusta esto:

c = chain(t1, (t2, t1e), (t3, t2e))

que significa:

t1link a t2 y link_error a t1e

t2link a t3 y link_error a t2e

+0

Decidí usar una tarea similar a una cadena que ejecuta todas las tareas que de otro modo estarían en una cadena, pero espera a que una tarea finalice antes de iniciar la otra, por ejemplo: 'task1.delay ([params]). obtener(); task2.delay ([params]). get(); task3.delay ([params]). get() '. La tarea similar a una cadena puede detectar las excepciones planteadas por cualquiera de las tareas y volver a intentarlo. – Andrei

+0

Entonces, de su ejemplo, t1e y t2e tendrían que llamar a t2 y, respectivamente, t3, ¿verdad? – Andrei

+0

El ejemplo es solo mi opinión sobre la posible sintaxis para la cadena. Significa que cada tarea siguiente ahora es de hecho un par de tareas, se llamará al primer elemento del par si no se produce una excepción/error en el paso anterior, y el segundo elemento es el controlador de excepción/error para el error del paso anterior. 't1e' significa' cadena de errores 't1' – anh

Cuestiones relacionadas