Estoy tratando de encontrar la mejor manera de usar agentes para consumir elementos de Message Queue (Amazon SQS). En este momento tengo una función (process-queue-item) que toma elementos de la cola y los procesa.Clojure agents consuming from a queue
Quiero procesar estos elementos al mismo tiempo, pero no puedo entender cómo controlar los agentes. Básicamente, quiero mantener a todos los agentes ocupados tanto como sea posible sin recurrir a muchos elementos de la cola y desarrollar una acumulación (haré que esto se ejecute en un par de máquinas, por lo que los artículos deben quedar en la cola hasta que son realmente necesarios).
¿Alguien puede darme algunos consejos para mejorar mi implementación?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
¿Hay alguna forma en que podría tratar el que Message Queue un seq y luego simplemente usa pmap para obtener la paralelización? –
@Alex Stoddard: en mi caso, el elemento de cola de proceso realmente bloquea en la red IO, por lo que no creo que pmap sea la elección correcta, ya que solo usa tantos hilos como la máquina tiene núcleos. – erikcw
@erikw: Claro, pero eso es solo un detalle de implementación de pmap (hilos = # códigos + 2). No hay razón por la que no puedas escribir una versión de pmap con un número parametrizado de subprocesos.Ver la primera línea de la fuente de pmap: (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) –