2012-10-04 15 views
5

Tengo un pequeño consumidor/editor de Clojure que recibe mensajes, los procesa y los envía a otros consumidores, todo a través de RabbitMQ.Clojure manejo de mensajes/asincrónico, multiproceso

He definido un controlador de mensajes que maneja los mensajes en un hilo separado (separado del hilo principal que es). Como se puede ver en el código siguiente, el hilo recibe y envía mensajes de manera síncrona, todo sucede en un bucle de evento iniciado por la función lcm/subscribe.

Entonces, la pregunta es, ¿cuál sería la "forma Clojure" para crear un grupo de subprocesos de tamaño N de estos manipuladores de mensajes síncronos? Supongo que el modo no Clojure sería generar manualmente una cantidad de hilos a través de la interoperabilidad de Java.

Además, ¿eso agilizaría el procesamiento de mensajes, teniendo en cuenta que el procesamiento no requiere mucha CPU? ¿Sería mejor volver asincrónica a estos manejadores de mensajes, teniendo en cuenta que dedica más tiempo a la publicación que al procesamiento?

Y, por último, ¿cómo voy a medir el rendimiento de estos enfoques contendientes (vengo del mundo de Ruby/Javascript, y no hay ningún multihilo ahí)?

NOTA: Sé que todo esto podría evitarse simplemente escalado horizontal y desove más procesos JVM escuchando el bus de mensajes, pero ya que la aplicación va a ser desplegado en Heroku, me gustaría utilizar como muchos recursos como sea posible en cada dyno/proceso.

(defn message-handler 
    [ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message))) 

(defn -main 
    [& args] 
    (let [conn   (rmq/connect {:uri (System/getenv "MSGQ")}) 
     ch   (lch/open conn) 
     q-name  "q.events.tagger" 
     e-sub-name "e.events.preproc" 
     e-pub-name "e.events" 
     routing-key "tasks.taggify"] 
    (lq/declare ch q-name :exclusive false :auto-delete false) 
    (le/declare ch e-pub-name "fanout" :durable false) 
    (lq/bind ch q-name e-sub-name :routing-key routing-key) 
    (.start (Thread. (fn [] 
         (lcm/subscribe ch q-name message-handler :auto-ack true)))))) 

En una nota más básica ... ¿cómo iba a ir sobre refactorización el código para apoyar el registro de la devolución de llamada mensaje manipulador con un argumento adicional, así:

(.start (Thread. (fn [] 
         (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true)))))) 

y a continuación, publicar con una referencia:

(lb/publish ch pub-name "" processed-message))) 

en lugar de un literal:

(lb/publish ch "e.events" "" processed-message))) 

Respuesta

2

Para la segunda parte de la pregunta, puede utilizar la aplicación parcial como se muestra a continuación:

(defn message-handler 
    [pub-name ch metadata ^bytes payload] 
    (let [msg (json/parse-string (String. payload "UTF-8")) 
     processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message))) 



(.start 
    (Thread. 
    (fn [] 
     (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true)))))) 
1

Este es un tema muy amplio, y puede considerar dividir esta pregunta en varias preguntas distintas, pero la respuesta concisa es: use agents.

+0

Gracias por la punta, lo hará. – neektza