Tengo un pequeño consumidor/editor de Clojure que recibe mensajes, los procesa y los envía a otros consumidores, todo a través de RabbitMQ.Clojure manejo de mensajes/asincrónico, multiproceso
He definido un controlador de mensajes que maneja los mensajes en un hilo separado (separado del hilo principal que es). Como se puede ver en el código siguiente, el hilo recibe y envía mensajes de manera síncrona, todo sucede en un bucle de evento iniciado por la función lcm/subscribe.
Entonces, la pregunta es, ¿cuál sería la "forma Clojure" para crear un grupo de subprocesos de tamaño N de estos manipuladores de mensajes síncronos? Supongo que el modo no Clojure sería generar manualmente una cantidad de hilos a través de la interoperabilidad de Java.
Además, ¿eso agilizaría el procesamiento de mensajes, teniendo en cuenta que el procesamiento no requiere mucha CPU? ¿Sería mejor volver asincrónica a estos manejadores de mensajes, teniendo en cuenta que dedica más tiempo a la publicación que al procesamiento?
Y, por último, ¿cómo voy a medir el rendimiento de estos enfoques contendientes (vengo del mundo de Ruby/Javascript, y no hay ningún multihilo ahí)?
NOTA: Sé que todo esto podría evitarse simplemente escalado horizontal y desove más procesos JVM escuchando el bus de mensajes, pero ya que la aplicación va a ser desplegado en Heroku, me gustaría utilizar como muchos recursos como sea posible en cada dyno/proceso.
(defn message-handler
[ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
processed-message (process msg)]
(lb/publish ch "e.events" "" processed-message)))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name message-handler :auto-ack true))))))
En una nota más básica ... ¿cómo iba a ir sobre refactorización el código para apoyar el registro de la devolución de llamada mensaje manipulador con un argumento adicional, así:
(.start (Thread. (fn []
(lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))
y a continuación, publicar con una referencia:
(lb/publish ch pub-name "" processed-message)))
en lugar de un literal:
(lb/publish ch "e.events" "" processed-message)))
Gracias por la punta, lo hará. – neektza