2012-10-01 14 views
9

Tengo una implementación de RabbitMQ existente que indica que algunas aplicaciones Java están utilizando los mensajes de registro de envío como objetos de cadena JSON en varios canales. Me gustaría utilizar Aplery para consumir estos mensajes y escribirlos en varios lugares (por ejemplo, DB, Hadoop, etc.).Uso de Aplery con mensajes RabbitMQ existentes

Veo que el apio está diseñado para ser tanto el productor como el consumidor de los mensajes RabbitMQ, ya que trata de ocultar el mecanismo por el cual se entregan esos mensajes. ¿Hay alguna forma de lograr que Aplery consuma mensajes creados por otra aplicación y ejecute trabajos cuando lleguen?

Respuesta

12

Actualmente es difícil agregar consumidores personalizados a los trabajadores del apio, pero esto está cambiando en la versión de desarrollo (para pasar a ser 3.1) donde agregué soporte para los pasos de arranque del consumidor.

No hay documentación sin embargo, como acabo de terminar su aplicación, pero en este caso es un ejemplo:

from celery import Celery 
from celery.bin import Option 
from celery.bootsteps import ConsumerStep 
from kombu import Consumer, Exchange, Queue 

class CustomConsumer(ConsumerStep): 
    queue = Queue('custom', Exchange('custom'), routing_key='custom') 

    def __init__(self, c, enable_custom_consumer=False, **kwargs): 
     self.enable = self.enable_custom_consumer 

    def get_consumers(self, connection): 
     return [ 
      Consumer(connection.channel(), 
       queues=[self.queue], 
       callbacks=[self.on_message]), 
     ] 

    def on_message(self, body, message): 
     print('GOT MESSAGE: %r' % (body,)) 
     message.ack() 


celery = Celery(broker='amqp://localhost//') 
celery.steps['consumer'].add(CustomConsumer) 
celery.user_options['worker'].add(
    Option('--enable-custom-consumer', action='store_true', 
      help='Enable our custom consumer.'), 
) 

Tenga en cuenta que la API puede cambiar en la versión final, una cosa que todavía no estoy seguro de sobre cómo se manejan los canales después de get_consumer(connection). Actualmente, el canal del consumidor se cierra cuando se pierde la conexión, y en el momento del cierre, , pero las personas pueden querer manejar los canales manualmente. En ese caso, siempre existe la posibilidad de personalizar ConsumerStep o escribir un nuevo StartStopStep.

+3

La documentación ahora se puede encontrar en http://celery.readthedocs.org/en/latest/userguide/extending.html –

Cuestiones relacionadas