2012-09-06 11 views
8

Tengo una aplicación que reacciona a los mensajes enviados por los clientes. Un mensaje es reload_credentials, que la aplicación recibe cada vez que se registra un nuevo cliente. Este mensaje se conectará a una base de datos PostgreSQL, realizará una consulta de todas las credenciales y luego las almacenará en un hash Ruby normal (client_id => client_token).¿Cómo debo manejar este caso de uso usando EventMachine?

Algunos otros mensajes que la aplicación puede recibir son start, stop, pause que se utilizan para realizar un seguimiento de algunos tiempos de sesión. Mi punto es que preveo la aplicación que funciona de la siguiente manera:

  • cliente envía un mensaje
  • mensaje se pone en cola
  • cola se está procesando

Sin embargo, por ejemplo, no me No quiero bloquear el reactor. Además, imaginemos que tengo un mensaje reload_credentials que está próximo en cola. No quiero que se procese ningún otro mensaje de la cola hasta que las credenciales se vuelvan a cargar desde el DB. Además, mientras proceso un determinado mensaje (como esperar a que finalice la consulta de credenciales), deseo permitir que otros mensajes se pongan en cola.

Podría usted por favor me guía hacia la solución de un problema? Estoy pensando que debo usar em-synchrony, pero no estoy seguro.

Respuesta

7

Utilice uno de los conductores Postgresql EM, o EM.defer para que no se bloquee el reactor.

Cuando recibe el mensaje '' reload_credentials sólo tapa una bandera que hace que todos los mensajes posteriores a encolar. Una vez que el 'reload_credentials' haya finalizado, procesa todos los mensajes de la cola. Después de que la cola esté vacía, voltee la bandera que hace que los mensajes se procesen a medida que se reciben.

conductores EM para PostgreSQL se enumeran aquí: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server 
    def post_init 
    @queue    = [] 
    @loading_credentials = false 
    end 

    def recieve_message(type, data) 
    return @queue << [type, data] if @loading_credentials || [email protected]? 
    return process_msg(type, data) unless :reload_credentials == type 
    @loading_credentials = true 
    reload_credentials do 
     @loading_credentials = false 
     process_queue 
    end 
    end 

    def reload_credentials(&when_done) 
    EM.defer(proc { query_and_load_credentials }, when_done) 
    end 


    def process_queue 
    while (type, data = @queue.shift) 
     process_msg(type, data) 
    end 
    end 

    # lots of other methods 
end 

EM.start_server(HOST, PORT, Server) 

Si desea que todas las conexiones a la cola de mensajes cada vez que una conexión recibe un mensaje '' reload_connections que tendrá que coordinar a través de los eigenclass.

+0

Pero, el mensaje reload_credentials podría recibirse varias veces. ¿No debería haber como 2 hilos? Uno que sigue haciendo cola y otro que está procesando? – Geo

+0

Sí, si se recibe reload_credentials mientras se está procesando reload_credentials, se pondrá en cola como otros mensajes. – simulacre

+0

Múltiples mensajes de reload_credentials deben manejarse como el primero. Al colocar las reload_credentials en un bloque EM.defer lo está ejecutando en otro hilo. Mientras su código de 'procesamiento' no sea bloqueado, seguirá recibiendo mensajes. Use bibliotecas compatibles con EM para asegurarse de no bloquear. Alternativamente, use EM.defer para el procesamiento. – simulacre

4

El siguiente es supongo, algo así como su implementación actual:

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 


    q = EM::Queue.new 

    workers = Array.new(10) { Worker.new q } 

El problema anterior, si he entendido bien, es que usted no quiere que los trabajadores que trabajan en nuevos empleos (puestos de trabajo que se han llegado antes en la línea de tiempo del productor) que cualquier trabajo de reload_credentials. Lo siguiente debe dar servicio a esto (palabras de precaución adicionales al final).

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 

    class LockingDispatcher 
     def initialize channel, queue 
     @channel = channel 
     @queue = queue 

     @backlog = [] 
     @channel.subscribe method(:dispatch_with_locking) 

     @locked = false 
     end 

     def dispatch_with_locking item 
     if locked? 
      @backlog << item 
     else 
      # You probably want to move the specialization here out into a method or 
      # block that's passed into the constructor, to make the lockingdispatcher 
      # more of a generic processor 
      case item.type 
      when :reload_credentials 
      lock 
      deferrable = CredentialReloader.new(item).start 
      deferrable.callback { unlock } 
      deferrable.errback { unlock } 
      else 
      dispatch_without_locking item 
      end 
     end 
     end 

     def dispatch_without_locking item 
     @queue << item 
     end 

     def locked? 
     @locked 
     end 

     def lock 
     @locked = true 
     end 

     def unlock 
     @locked = false 
     bl = @backlog.dup 
     @backlog.clear 
     bl.each { |item| dispatch_with_locking item } 
     end 

    end 

    channel = EM::Channel.new 
    queue = EM::Queue.new 

    dispatcher = LockingDispatcher.new channel, queue 

    workers = Array.new(10) { Worker.new queue } 

Por lo tanto, la entrada al primer sistema entra en q, pero en este nuevo sistema se trata en el channel. El queue todavía se utiliza para la distribución del trabajo entre los trabajadores, pero la queue no se rellena mientras una operación de actualización de credenciales está pasando. Por desgracia, ya que no toman más tiempo, no he generalizado la LockingDispatcher tal que no se acopla con el tipo de elemento y el código para el envío de CredentialsReloader. Te lo dejo a ti.

Debe tener en cuenta que si bien esto es lo que entiendo de su solicitud original, generalmente es mejor relajar este tipo de requisitos. Hay varios problemas pendientes que en esencia no puede ser erradicado sin alteraciones en ese requisito:

  • El sistema no espera a que la ejecución de los trabajos para completar antes de comenzar los trabajos de credenciales
  • El sistema se encargará de ráfagas de credenciales empleos muy mal - otros artículos que pueden ser procesables, no lo serán.
  • En el caso de un error en el código de credenciales, el retraso acumulado podría llenar el ram y provocar un error. Un tiempo de espera simple podría ser suficiente para evitar efectos catastróficos, si el código es abortable, y los mensajes subsiguientes son lo suficientemente procesables como para evitar más bloqueos.

Parece que tiene alguna noción de usuario en el sistema. Si piensa en sus requisitos, es posible que solo necesite acumular elementos pertenecientes a un ID de usuario cuyas credenciales estén en estado de actualización. Este es un problema diferente, que implica un tipo diferente de despacho. Pruebe una cantidad acumulada de atrasos bloqueados para esos usuarios, con una devolución de llamada al completar la credencial para drenar esos atrasos en los trabajadores, o algún arreglo similar.

Buena suerte!

Cuestiones relacionadas