2011-06-27 18 views
17

Estoy intentando consumir un servicio web de forma asíncrona, ya que demora hasta 45 segundos en volver. Desafortunadamente, este servicio web también es poco confiable y puede generar errores. Configuré django-celery y tengo la ejecución de mis tareas, que funciona bien hasta que la tarea falla más allá de max_retries.Falló la recuperación de la tarea más allá de max_retries

Esto es lo que tengo hasta ahora:

@task(default_retry_delay=5, max_retries=10) 
def request(xml): 
    try: 
     server = Client('https://www.whatever.net/RealTimeService.asmx?wsdl') 
     xml = server.service.RunRealTimeXML(
      username=settings.WS_USERNAME, 
      password=settings.WS_PASSWORD, 
      xml=xml 
     ) 
    except Exception, e: 
     result = Result(celery_id=request.request.id, details=e.reason, status="i") 
     result.save() 
     try: 
      return request.retry(exc=e) 
     except MaxRetriesExceededError, e: 
      result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f") 
      result.save() 
      raise 
    result = Result(celery_id=request.request.id, details=xml, status="s") 
    result.save() 
    return result 

Desafortunadamente, MaxRetriesExceededError no está siendo lanzada por retry(), así que no estoy seguro de cómo manejar el fracaso de esta tarea. Django ya ha devuelto HTML al cliente, y estoy verificando el contenido de Result a través de AJAX, que nunca llega a un estado de falla total f.

Entonces la pregunta es: ¿Cómo puedo actualizar mi base de datos cuando la tarea de Apio ha excedido max_retries?

Respuesta

14

Puede reemplazar el método after_return de la clase de tareas apio, este método se llama después de la ejecución de la tarea cualquiera que sea el estado de ret (éxito, fracasado, RETRY)

class MyTask(celery.task.Task) 

    def run(self, xml, **kwargs) 
     #Your stuffs here 

    def after_return(self, status, retval, task_id, args, kwargs, einfo=None): 
     if self.max_retries == int(kwargs['task_retries']): 
      #If max retries are equals to task retries do something 
     if status == "FAILURE": 
      #You can do also something if the tasks fail instead of check the retries 

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return

+0

Desde el enlace al parecer es obsoleto en la actualidad, [aquí es una nueva] (http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return) – rschwieb

+0

Gracias, respuesta actualizada. –

15

Con la versión 2.3.2 de apio este enfoque ha funcionado bien para mí:

class MyTask(celery.task.Task): 
    abstract = True 

    def after_return(self, status, retval, task_id, args, kwargs, einfo): 
     if self.max_retries == self.request.retries: 
      #If max retries is equal to task retries do something 

@task(base=MyTask, default_retry_delay=5, max_retries=10) 
def request(xml): 
    #Your stuff here 
6

Solo voy con esto por ahora, me ahorra el trabajo de subclasar tareas y me resulta fácil de entender.

# auto-retry with delay as defined below. After that, hook is disabled. 
@celery.shared_task(bind=True, max_retries=5, default_retry_delay=300) 
def post_data(self, hook_object_id, url, event, payload): 
    headers = {'Content-type': 'application/json'} 
    try: 
     r = requests.post(url, data=payload, headers=headers) 
     r.raise_for_status() 
    except requests.exceptions.RequestException as e: 
     if self.request.retries >= self.max_retries: 
      log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event) 
      Webhook.objects.filter(object_id=hook_object_id).update(active=False) 
      return False 
     raise self.retry(exc=e) 
    return True 
8

El problema es que el apio está intentando volver a plantear la excepción que pasó cuando alcanza el límite de reintento. El código para hacer esto re-elevación está aquí: https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

La manera más simple evitar este problema es simplemente no tener el apio administrar sus excepciones en absoluto:

@task(max_retries=10) 
def mytask(): 
    try: 
     do_the_thing() 
    except Exception as e: 
     try: 
      mytask.retry() 
     except MaxRetriesExceededError: 
      do_something_to_handle_the_error() 
      logger.exception(e) 
+1

Esta es la solución correcta para el problema original. –

Cuestiones relacionadas