2011-12-14 16 views
33

estoy usando Celery para gestionar tareas asíncronas. Ocasionalmente, sin embargo, el proceso de apio disminuye y no se ejecuta ninguna de las tareas. Me gustaría poder verificar el estado del apio y asegurarme de que todo funciona bien, y si detecto algún problema, mostraré un mensaje de error al usuario. De la documentación Trabajador Apio parece que podría ser capaz de utilizar ping o inspect para esto, pero de ping siente hacky y no está claro exactamente cómo inspeccionar está destinado a ser utilizado (si inspeccionar(). Registrado() está vacía?).detectar si apio está disponible/Ejecución

Cualquier orientación sobre este sería apreciada. Básicamente lo que estoy buscando es un método de este modo:

def celery_is_alive(): 
    from celery.task.control import inspect 
    return bool(inspect().registered()) # is this right?? 

EDIT: No siquiera se parece a domicilio() está disponible en el apio 2.3.3 (a pesar de que los 2,1 lista de documentos que). Quizás ping es la respuesta correcta.

EDIT: Ping también no parece hacer lo que pensaba que iba a hacer, por lo que aún no está seguro de la respuesta aquí.

+0

¿La respuesta a continuación no funciona para usted? Como alguien que tiene un problema similar para resolver, me gustaría algo de confirmación. – kojiro

Respuesta

44

Aquí está el código que he estado usando. celery.task.control.Inspect.stats() devuelve un dict que contiene muchos detalles sobre los trabajadores actualmente disponibles, Ninguno si no hay trabajadores en ejecución, o plantea un IOError si no se puede conectar con el intermediario de mensajes. Estoy usando RabbitMQ: es posible que otros sistemas de mensajería se comporten de forma ligeramente diferente. Esto funcionó en Celery 2.3.x y 2.4.x; No estoy seguro de cuánto tiempo más atrás.

def get_celery_worker_status(): 
    ERROR_KEY = "ERROR" 
    try: 
     from celery.task.control import inspect 
     insp = inspect() 
     d = insp.stats() 
     if not d: 
      d = { ERROR_KEY: 'No running Celery workers were found.' } 
    except IOError as e: 
     from errno import errorcode 
     msg = "Error connecting to the backend: " + str(e) 
     if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': 
      msg += ' Check that the RabbitMQ server is running.' 
     d = { ERROR_KEY: msg } 
    except ImportError as e: 
     d = { ERROR_KEY: str(e)} 
    return d 
+0

Funcionó para mí :) – kojiro

+6

Descubrí que lo anterior agrega dos colas reply.celery.pidbox a rabbitmq cada vez que se ejecuta. Esto conduce a un aumento incremental en el uso de la memoria de rabbitmq. – kojiro

2

Los siguientes trabajó para mí:

import socket 
from kombu import Connection 

celery_broker_url = "amqp://localhost" 

try: 
    conn = Connection(celery_broker_url) 
    conn.ensure_connection(max_retries=3) 
except socket.error: 
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url)) 
+2

Estoy bastante seguro de que esto tendrá éxito si rabbitmq se ejecuta independientemente del estado del apio. Pero este es un buen control para hacer si el apio no sabe si la falla es con rabbitmq u otra cosa. –

Cuestiones relacionadas