2011-09-13 11 views
8

El módulo de productor de mi aplicación lo ejecutan los usuarios que desean enviar el trabajo a realizar en un pequeño clúster. Envía las suscripciones en formato JSON a través del intermediario de mensajes RabbitMQ.¿Cuál es el mejor patrón para diseñar una aplicación RPC asíncrona usando Python, Pika y AMQP?

He intentado varias estrategias, y la mejor hasta el momento es la siguiente, que todavía no es totalmente funcional:

Cada máquina de clúster se ejecuta un módulo de consumo, que se suscribe en sí a la cola AMQP y emite un prefetch_count para decirle al agente cuántas tareas puede ejecutar a la vez.

Pude hacer que funcione usando SelectConnection de la biblioteca de Pika AMQP. Tanto el consumidor como el productor inician dos canales, uno conectado a cada cola. El productor envía solicitudes al canal [A] y espera respuestas en el canal [B], y el consumidor espera solicitudes en el canal [A] y envía respuestas en el canal [B]. Parece, sin embargo, que cuando el consumidor ejecuta la devolución de llamada que calcula la respuesta, bloquea, por lo que solo ejecuto una tarea en cada consumidor en cada momento.

Lo que necesito en el final:

  1. el consumidor [A] suscribe sus tareas (alrededor de 5k cada vez) para el cluster
  2. el corredor despacha N mensajes/solicitudes para cada consumidor, donde N es el número de tareas simultáneas que puede manejar
  3. cuando se termina una sola tarea, el consumidor responde al intermediario/productor con el resultado
  4. el productor recibe las respuestas, actualiza el estado de cálculo y, al final, imprime algunos informes

Restricciones:

  • Si otro usuario envía el trabajo, todas sus tareas se pondrán en cola después de que el usuario anterior (supongo que esto es cierto automáticamente desde el sistema de colas, pero no he pensado sobre las implicaciones en un entorno de rosca)
  • tareas tienen que ser presentadas una orden, pero el orden en que se contestan no es importante

UP FECHA

He estudiado un poco más y mi problema real parece ser que utilizo una función simple como devolución de llamada a la función SelectConnection.channel.basic_consume() de pika. Mi última idea (no implementada) es pasar una función de subprocesamiento, en lugar de una función regular, para que la devolución de llamada no se bloquee y el consumidor pueda seguir escuchando.

+0

¡el problema es muy similar a lo que encuentro! –

Respuesta

0

Su configuración me parece bien. Y tiene razón, puede simplemente configurar la devolución de llamada para iniciar un hilo y encadenarlo a una devolución de llamada separada cuando el hilo termine de poner la respuesta en cola por el Canal B.

Básicamente, sus consumidores deben tener una cola propia (tamaño de N, cantidad de paralelismo que admiten). Cuando llega una solicitud a través del canal A, debe almacenar el resultado en la cola compartida entre el hilo principal con Pika y los hilos de trabajo en el grupo de subprocesos. Tan pronto como se ponga en cola, pika debería responder con ACK, y su cadena de trabajo se activaría y comenzaría a procesarse.

Una vez que el trabajador haya terminado con su trabajo, cola el resultado en una cola de resultados separada y emitirá una devolución de llamada al hilo principal para enviarlo de vuelta al consumidor.

Debe tener cuidado y asegurarse de que los subprocesos de trabajo no interfieren entre sí si están utilizando recursos compartidos, pero ese es un tema aparte.

0

Al ser inexperto en el enhebrado, mi configuración podría ejecutar múltiples procesos de consumo (el número de los cuales es básicamente su conteo de búsqueda previa). Cada uno se conectaría a las dos colas y procesarían los trabajos felizmente, desconociendo la existencia de cada uno.

2

Como habrás notado, tu proceso se bloquea cuando ejecuta una devolución de llamada. Hay varias maneras de lidiar con esto dependiendo de lo que hace su devolución de llamada.

Si está obligado IO-su devolución de llamada (haciendo un montón de trabajo en red o disco IO) se puede utilizar cualquiera de los hilos o una solución basada en greenlet, como gevent, eventlet, o greenhouse. Tenga en cuenta, sin embargo, que Python está limitado por el GIL (Global Interpreter Lock), lo que significa que solo se ejecuta una parte del código python en un solo proceso de python. Esto significa que si hace muchos cálculos con el código python, estas soluciones probablemente no sean mucho más rápidas de lo que ya tiene.

Otra opción sería implementar su consumidor como procesos múltiples usando multiprocessing. He encontrado que el multiprocesamiento es muy útil cuando se trabaja en paralelo. Puede implementar esto usando Queue, teniendo el proceso principal como consumidor y cultivando el trabajo para sus hijos, o simplemente iniciando múltiples procesos que cada uno consume por sí mismo. Sugeriría, a menos que su aplicación sea altamente concurrente (miles de trabajadores), simplemente iniciar varios trabajadores, cada uno de los cuales consume de su propia conexión. De esta forma, puede usar la función de acuse de recibo de AMQP, de modo que si un consumidor muere mientras está procesando una tarea, el mensaje se envía de vuelta a la cola automáticamente y será recogido por otro trabajador, en lugar de simplemente perder la solicitud.

Una última opción, si controlas el productor y también está escrito en Python, es utilizar una biblioteca de tareas como celery para abstraer el funcionamiento de tareas/cola para usted. He usado apio para varios proyectos grandes y he encontrado que está muy bien escrito. También manejará los múltiples problemas del consumidor para usted con la configuración adecuada.

+0

+1 por mencionar el apio –

Cuestiones relacionadas