2012-03-20 12 views
5

Estoy un poco confundido sobre cómo debería ser mi configuración para configurar un intercambio de temas.Intercambio de temas con Apio y RabbitMQ

http://www.rabbitmq.com/tutorials/tutorial-five-python.html

Esto es lo que me gustaría lograr:

Task1 -> send to QueueOne and QueueFirehose 
Task2 -> sent to QueueTwo and QueueFirehose 

a continuación:

Task1 -> consume from QueueOne 
Task2 -> consume from QueueTwo 
TaskFirehose -> consume from QueueFirehose 

sólo quiero Tarea1 a consumir a partir QueueOne y Task2 a consumir de QueueTwo.

Ese problema ahora es que cuando se ejecutan Task1 y 2, también agotan QueueFirehose, y la tarea TaskFirehose nunca se ejecuta.

¿Hay algún problema con mi configuración o estoy malinterpretando algo?

CELERY_QUEUES = { 
    "QueueOne": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.one", 
    }, 
    "QueueTwo": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.two", 
    }, 
    "QueueFirehose": { 
     "exchange_type": "topic", 
     "binding_key": "pipeline.#", 
    }, 
} 

CELERY_ROUTES = { 
     "tasks.task1": { 
      "queue": 'QueueOne', 
      "routing_key": 'pipeline.one', 
     }, 
     "tasks.task2": { 
      "queue": 'QueueTwo', 
      "routing_key": 'pipeline.two', 
     }, 
     "tasks.firehose": { 
      'queue': 'QueueFirehose', 
      "routing_key": 'pipeline.#', 
     }, 
} 
+0

Quizás esta sea solo una terminología para aclarar, pero su descripción parece que está combinando tareas y trabajadores. Por ejemplo, diga "Tarea2 enviada a Queue2" y luego diga "Tarea2 para consumir desde Queue2". Las tareas no consumen; son consumidos (por los trabajadores). También dice "La tarea TaskFirehose nunca se ejecuta", pero en su descripción, no se está enviando TaskFirehose a ninguna cola. El concepto básico es: las tareas se envían a las colas; y los trabajadores ejecutan tareas desde las colas que les asignaron. ¡Tareas! = Los trabajadores que las ejecutan. –

Respuesta

0

Suponiendo que en realidad quería decir algo como esto:

Task1 -> send to QueueOne 
Task2 -> sent to QueueTwo 
TaskFirehose -> send to QueueFirehose 

a continuación:

Worker1 -> consume from QueueOne, QueueFirehose 
Worker2 -> consume from QueueTwo, QueueFirehose 
WorkerFirehose -> consume from QueueFirehose 

Esto podría no ser exactamente lo que quería decir, pero creo que debería cubrir muchos escenarios y espero que sea tuyo también Algo como esto debería funcionar:

# Advanced example starting 10 workers in the background: 
# * Three of the workers processes the images and video queue 
# * Two of the workers processes the data queue with loglevel DEBUG 
# * the rest processes the default' queue. 

$ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data 
-Q default -L:4,5 DEBUG 

Para obtener más opciones y referencia: http://celery.readthedocs.org/en/latest/reference/celery.bin.multi.html

Esto era directamente de la documentación.

Yo también tuve una situación similar, y lo abordé de una manera ligeramente diferente. No pude usar apio múltiple con supervisord. Entonces, en su lugar, creé múltiples programas en supervisión para cada trabajador. Los trabajadores estarán en diferentes procesos de todos modos, así que solo deje que el supervisor se encargue de todo por usted. El archivo de configuración se ve algo como: -

; ================================== 
; celery worker supervisor example 
; ================================== 

[program:Worker1] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueOne, QueueFirehose 

directory=/path/to/project 
user=nobody 
numprocs=1 
stdout_logfile=/var/log/celery/worker1.log 
stderr_logfile=/var/log/celery/worker1.log 
autostart=true 
autorestart=true 
startsecs=10 

; Need to wait for currently executing tasks to finish at shutdown. 
; Increase this if you have very long running tasks. 
stopwaitsecs = 600 

; When resorting to send SIGKILL to the program to terminate it 
; send SIGKILL to its whole process group instead, 
; taking care of its children as well. 
killasgroup=true 

; if rabbitmq is supervised, set its priority higher 
; so it starts first 
priority=998 

Del mismo modo, para worker2 y WorkerFirehose, editar las líneas correspondientes para hacer:

[program:Worker2] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueTwo, QueueFirehose 

y

[program:WorkerFirehose] 
; Set full path to celery program if using virtualenv 
command=celery worker -A proj --loglevel=INFO -Q QueueFirehose 

todos ellos incluyen en supervisord archivo .conf y eso debería hacerlo.

Cuestiones relacionadas