2010-04-08 9 views
26

Estoy tratando de encontrar la mejor manera de usar agentes para consumir elementos de Message Queue (Amazon SQS). En este momento tengo una función (process-queue-item) que toma elementos de la cola y los procesa.Clojure agents consuming from a queue

Quiero procesar estos elementos al mismo tiempo, pero no puedo entender cómo controlar los agentes. Básicamente, quiero mantener a todos los agentes ocupados tanto como sea posible sin recurrir a muchos elementos de la cola y desarrollar una acumulación (haré que esto se ejecute en un par de máquinas, por lo que los artículos deben quedar en la cola hasta que son realmente necesarios).

¿Alguien puede darme algunos consejos para mejorar mi implementación?

(def active-agents (ref 0)) 

(defn process-queue-item [_] 
    (dosync (alter active-agents inc)) 
    ;retrieve item from Message Queue (Amazon SQS) and process 
    (dosync (alter active-agents dec))) 

(defn -main [] 
    (def agents (for [x (range 20)] (agent x))) 

    (loop [loop-count 0] 

    (if (< @active-agents 20) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent)) 
     ;should skip this agent until later if it is still busy processing (not sure how) 
     (send-off agent process-queue-item))) 

    ;(apply await-for (* 10 1000) agents) 
    (Thread/sleep 10000) 
    (logging/info (str "ACTIVE AGENTS " @active-agents)) 
    (if (> 10 loop-count) 
     (do (logging/info (str "done, let's cleanup " count)) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent))) 
     (apply await agents) 
     (shutdown-agents)) 
     (recur (inc count))))) 
+0

¿Hay alguna forma en que podría tratar el que Message Queue un seq y luego simplemente usa pmap para obtener la paralelización? –

+0

@Alex Stoddard: en mi caso, el elemento de cola de proceso realmente bloquea en la red IO, por lo que no creo que pmap sea la elección correcta, ya que solo usa tantos hilos como la máquina tiene núcleos. – erikcw

+0

@erikw: Claro, pero eso es solo un detalle de implementación de pmap (hilos = # códigos + 2). No hay razón por la que no puedas escribir una versión de pmap con un número parametrizado de subprocesos.Ver la primera línea de la fuente de pmap: (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) –

Respuesta

6

Lo que está pidiendo es una forma de seguir repartiendo tareas pero con un límite superior. Un enfoque simple para esto es usar un semáforo para coordinar el límite. Así es como lo abordaría:

(let [limit (.availableProcessors (Runtime/getRuntime)) 
     ; note: you might choose limit 20 based upon your problem description 
     sem (java.util.concurrent.Semaphore. limit)] 
    (defn submit-future-call 
    "Takes a function of no args and yields a future object that will 
    invoke the function in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [#^Callable task] 
    ; take a slot (or block until a slot is free) 
    (.acquire sem) 
    (try 
     ; create a future that will free a slot on completion 
     (future (try (task) (finally (.release sem)))) 
     (catch java.util.concurrent.RejectedExecutionException e 
     ; no task was actually submitted 
     (.release sem) 
     (throw e))))) 

(defmacro submit-future 
    "Takes a body of expressions and yields a future object that will 
    invoke the body in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [& body] `(submit-future-call (fn [] [email protected]))) 

#_(example 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    ;; blocks at this point for a 2 processor PC until the previous 
    ;; two futures complete 
    #<[email protected]: :pending> 
    ;; then submits the job 

Con eso en su lugar ahora solo necesita coordinar cómo se llevan a cabo las tareas. Parece que ya tienes los mecanismos para hacer eso. Bucle (enviar-futuro (proceso-cola-elemento))

4

¿Quizás podría utilizar la función seque? Citando (doc seque):

clojure.core/seque 
([s] [n-or-q s]) 
    Creates a queued seq on another (presumably lazy) seq s. The queued 
    seq will produce a concrete seq in the background, and can get up to 
    n items ahead of the consumer. n-or-q can be an integer n buffer 
    size, or an instance of java.util.concurrent BlockingQueue. Note 
    that reading from a seque can block if the reader gets ahead of the 
    producer. 

Lo que tengo en mente es una secuencia perezosa que elementos de la cola en la red; Envuelva esto en seque, póngalo en un Ref y haga que los Agentes trabajadores consuman artículos de este seque. seque devuelve algo que se parece a un seq normal desde el punto de vista de su código, con la magia de cola sucediendo de forma transparente. Tenga en cuenta que si la secuencia que inserta se fragmenta, se forzará un fragmento a la vez. También tenga en cuenta que la llamada inicial al seque parece bloquearse hasta que se obtenga un elemento inicial o dos (o un fragmento, según sea el caso; creo que tiene más que ver con la forma en que las secuencias perezosas funcionan que con el propio seque).

Un bosquejo del código (un realmente uno incompletos, no se ha probado en absoluto):

(defn get-queue-items-seq [] 
    (lazy-seq 
    (cons (get-queue-item) 
     (get-queue-items-seq)))) 

(def task-source (ref (seque (get-queue-items-seq)))) 

(defn do-stuff [] 
    (let [worker (agent nil)] 
    (if-let [result 
      (dosync 
       (when-let [task (first @task-source)] 
       (send worker (fn [_] (do-stuff-with task)))))] 
     (do (await worker) 
      ;; maybe do something with worker's state 
      (do-stuff))))) ;; continue working 

(defn do-lots-of-stuff [] 
    (let [fs (doall (repeatedly 20 #(future (do-stuff))))] 
    fs))) 

En realidad, usted probablemente querrá un productor más compleja de la SEC elemento de cola por lo que se puede pedir dejar de producir nuevos elementos (una necesidad si todo se puede cerrar con gracia, los futuros morirán cuando la fuente de tareas se seque, use future-done? para ver si ya lo han hecho). Y eso es algo que puedo ver a primera vista ... Estoy seguro de que hay más cosas que pulir aquí. Creo que el enfoque general funcionaría, sin embargo.

+0

He agregado una corrección a la última línea del bosquejo del código por el cual se crearán los futuros. (Algo crucial para la idea en general, realmente ... :-)) –

+0

Estoy tratando de entender este código. ¿Por qué la tarea-fuente es una referencia? No pareces alterarlo en ningún momento. –

+0

@ Siddhartha Reddy: A primera vista, diría que esta es la razón por la que llamé al código "* realmente * incompleto". ;-) Supongo que necesitaría '(alter task-source rest)' (o 'next') en' when-let' dentro de 'dosync' para ser útil. En realidad, al pensar en esto de nuevo, me pregunto si usar 'seque' aquí es una buena idea después de todo; me parece ahora que aumenta el número de elementos de la cola que se perderían en caso de un bloqueo de la máquina local (ya que 'seque' tira elementos antes de que los trabajadores los soliciten). Por otra parte, en algunos escenarios podría ser bueno para el rendimiento; eso es –

23
(let [switch (atom true) ; a switch to stop workers 
     workers (doall 
       (repeatedly 20 ; 20 workers pulling and processing items from SQS 
        #(future (while @switch 
          (retrieve item from Amazon SQS and process)))))] 
    (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-) 
    (reset! switch false) ; stop ! 
    (doseq [worker workers] @worker)) ; waiting for all workers to be done 
+2

Esto ya no funciona con 1.4 ('future' y' future-call' no devuelven 'IFn', que' repetidamente' requiere). Sin embargo, puede envolver fácilmente un futuro en una función, anteponiendo '(future' with' # ', –

+3

@AlexB, buena captura, ni siquiera es un problema de 1.4: el # debería haber estado allí. Reparé el código, ¡Gracias! – cgrand

0

No sabe cómo idiomática esto es, como todavía soy un novato con el idioma, pero la solución siguiente funciona para mí:

(let [number-of-messages-per-time 2 
     await-timeout 1000] 
    (doseq [p-messages (partition number-of-messages-per-time messages)] 
    (let [agents (map agent p-messages)] 
     (doseq [a agents] (send-off a process)) 
     (apply await-for await-timeout agents) 
     (map deref agents))))