El módulo de productor de mi aplicación lo ejecutan los usuarios que desean enviar el trabajo a realizar en un pequeño clúster. Envía las suscripciones en formato JSON a través del intermediario de mensajes RabbitMQ.¿Cuál es el mejor patrón para diseñar una aplicación RPC asíncrona usando Python, Pika y AMQP?
He intentado varias estrategias, y la mejor hasta el momento es la siguiente, que todavía no es totalmente funcional:
Cada máquina de clúster se ejecuta un módulo de consumo, que se suscribe en sí a la cola AMQP y emite un prefetch_count para decirle al agente cuántas tareas puede ejecutar a la vez.
Pude hacer que funcione usando SelectConnection de la biblioteca de Pika AMQP. Tanto el consumidor como el productor inician dos canales, uno conectado a cada cola. El productor envía solicitudes al canal [A] y espera respuestas en el canal [B], y el consumidor espera solicitudes en el canal [A] y envía respuestas en el canal [B]. Parece, sin embargo, que cuando el consumidor ejecuta la devolución de llamada que calcula la respuesta, bloquea, por lo que solo ejecuto una tarea en cada consumidor en cada momento.
Lo que necesito en el final:
- el consumidor [A] suscribe sus tareas (alrededor de 5k cada vez) para el cluster
- el corredor despacha N mensajes/solicitudes para cada consumidor, donde N es el número de tareas simultáneas que puede manejar
- cuando se termina una sola tarea, el consumidor responde al intermediario/productor con el resultado
- el productor recibe las respuestas, actualiza el estado de cálculo y, al final, imprime algunos informes
Restricciones:
- Si otro usuario envía el trabajo, todas sus tareas se pondrán en cola después de que el usuario anterior (supongo que esto es cierto automáticamente desde el sistema de colas, pero no he pensado sobre las implicaciones en un entorno de rosca)
- tareas tienen que ser presentadas una orden, pero el orden en que se contestan no es importante
UP FECHA
He estudiado un poco más y mi problema real parece ser que utilizo una función simple como devolución de llamada a la función SelectConnection.channel.basic_consume() de pika. Mi última idea (no implementada) es pasar una función de subprocesamiento, en lugar de una función regular, para que la devolución de llamada no se bloquee y el consumidor pueda seguir escuchando.
¡el problema es muy similar a lo que encuentro! –