Actualmente es difícil agregar consumidores personalizados a los trabajadores del apio, pero esto está cambiando en la versión de desarrollo (para pasar a ser 3.1) donde agregué soporte para los pasos de arranque del consumidor.
No hay documentación sin embargo, como acabo de terminar su aplicación, pero en este caso es un ejemplo:
from celery import Celery
from celery.bin import Option
from celery.bootsteps import ConsumerStep
from kombu import Consumer, Exchange, Queue
class CustomConsumer(ConsumerStep):
queue = Queue('custom', Exchange('custom'), routing_key='custom')
def __init__(self, c, enable_custom_consumer=False, **kwargs):
self.enable = self.enable_custom_consumer
def get_consumers(self, connection):
return [
Consumer(connection.channel(),
queues=[self.queue],
callbacks=[self.on_message]),
]
def on_message(self, body, message):
print('GOT MESSAGE: %r' % (body,))
message.ack()
celery = Celery(broker='amqp://localhost//')
celery.steps['consumer'].add(CustomConsumer)
celery.user_options['worker'].add(
Option('--enable-custom-consumer', action='store_true',
help='Enable our custom consumer.'),
)
Tenga en cuenta que la API puede cambiar en la versión final, una cosa que todavía no estoy seguro de sobre cómo se manejan los canales después de get_consumer(connection)
. Actualmente, el canal del consumidor se cierra cuando se pierde la conexión, y en el momento del cierre, , pero las personas pueden querer manejar los canales manualmente. En ese caso, siempre existe la posibilidad de personalizar ConsumerStep o escribir un nuevo StartStopStep.
La documentación ahora se puede encontrar en http://celery.readthedocs.org/en/latest/userguide/extending.html –