2011-07-19 14 views
8

Estoy intentando crear una aplicación de chat simple usando AMQP, Websockets y Ruby. Entiendo que este puede no ser el mejor caso de uso para entender AMQP, pero me gustaría entender dónde me estoy equivocando.AMQP creando suscripciones a colas de forma dinámica

La siguiente es mi código amqp-servidor

require 'rubygems' 
require 'amqp' 
require 'mongo' 
require 'em-websocket' 
require 'json' 

class MessageParser 
    # message format => "room:harry_potter, nickname:siddharth, room:members" 
    def self.parse(message) 
    parsed_message = JSON.parse(message) 

    response = {} 
    if parsed_message['status'] == 'status' 
     response[:status] = 'STATUS' 
     response[:username] = parsed_message['username'] 
     response[:roomname] = parsed_message['roomname'] 
    elsif parsed_message['status'] == 'message' 
     response[:status] = 'MESSAGE' 
     response[:message] = parsed_message['message'] 
     response[:roomname] = parsed_message['roomname'].split().join('_') 
    end 

    response 
    end 
end 

class MongoManager 
    def self.establish_connection(database) 
    @db ||= Mongo::Connection.new('localhost', 27017).db(database) 
    @db.collection('rooms') 

    @db 
    end 
end 


@sockets = [] 
EventMachine.run do 
    connection = AMQP.connect(:host => '127.0.0.1') 
    channel = AMQP::Channel.new(connection) 

    puts "Connected to AMQP broker. #{AMQP::VERSION} " 

    mongo = MongoManager.establish_connection("trackertalk_development") 

    EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| 
    socket_detail = {:socket => ws} 
    ws.onopen do 
     @sockets << socket_detail 

    end 

    ws.onmessage do |message| 

     status = MessageParser.parse(message)   
     exchange = channel.fanout(status[:roomname].split().join('_')) 

     if status[:status] == 'STATUS'    
     queue = channel.queue(status[:username], :durable => true) 

     unless queue.subscribed? 
     puts "--------- SUBSCRIBED --------------" 
     queue.bind(exchange).subscribe do |payload| 
      puts "PAYLOAD : #{payload}" 
      ws.send(payload) 
      end 
     else 
      puts "----ALREADY SUBSCRIBED" 
     end     

     # only after 0.8.0rc14 
     #queue = channel.queue(status[:username], :durable => true)  
     #AMQP::Consumer.new(channel, queue)   

     elsif status[:status] == 'MESSAGE' 
     puts "********************* Message- published ******************************" 
     exchange.publish(status[:message) 
     end     
    end 

    ws.onclose do 
     @sockets.delete ws 
    end 
    end  
end 

utilizo el estado para indicar si el mensaje entrante es un mensaje de chat en curso o por un mensaje de estado que requiere que maneje las tareas, como suscribirse a la cola .

El problema que enfrentamos es que cuando envío un mensaje como socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

El exchange.publish' is called but it still doesn't get pushed via the ws.send` al navegador.

¿Hay algo fundamentalmente erróneo en mi comprensión de EventMachine y AMQP?

Aquí es la pezonera para el mismo código http://pastie.org/private/xosgb8tw1w5vuroa4w7a

Mi código parece funcionar como se desea cuando quito la durable => true de queue = channel.queue(status[:username], :durable => true)

El siguiente es un fragmento de mis rieles de vista que identifica el nombre del usuario y el nombre de la habitación y lo envía como parte del mensaje a través de Websockets.

Aunque el código parece funcionar cuando elimino el durable => true no entiendo por qué eso afecta el mensaje que se está entregando. Ignora la parte de mongo ya que todavía no reproduce ninguna parte.

También me gustaría saber si mi acercamiento a AMQP y su uso es correcto

<script> 
    $(document).ready(function(){ 
     var username = '<%= @user.email %>'; 
     var roomname = 'Bazingaa'; 

     socket = new WebSocket('ws://127.0.0.1:8080/'); 

     socket.onopen = function(msg){ 
      console.log('connected'); 
      socket.send(JSON.stringify({status:'status', username:username, roomname:roomname})); 
     } 

     socket.onmessage = function(msg){ 
      $('#chat-log').append(msg.data); 

     } 

    }); 

</script> 
<div class='block'> 
    <div class='content'> 
    <h2 class='title'><%= @room.name %></h2> 
    <div class='inner'> 
     <div id="chat-log"> 
     </div> 

     <div id="chat-console"> 
     <textarea rows="5" cols="40"></textarea> 
     </div> 
    </div> 
    </div> 
</div> 

<style> 
    #chat-log{ 
     color:#000; 
     font-weight:bold; 
     margin-top:1em; 
     width:900px; 
     overflow:auto; 
     height:300px; 
    } 
    #chat-console{ 
     bottom:10px; 
    } 

    textarea{ 
     width:100%; 
     height:60px; 
    } 
</style> 
+0

Podría alguien también decirme cómo organizar mi código en el entorno de producción si tuviera que ejecutar mi código amqp como daemon. Cualquier código de muestra que me ayude a organizar mi código será de gran ayuda. – Sid

Respuesta

1

Creo que su problema podría ser que la cola se detiene en el intermediario entre las invocaciones de ws.onmessage. Cuando el cliente vuelve a conectar la cola y ya existe el enlace, no se llama a ws.send().

De forma predeterminada, al crear una cola, ésta y cualquier vinculación que tenga, se mantiene hasta que el broker se reinicie, o le dice explícitamente al intermediario que la elimine.

Hay dos formas de cambiar esto:

  • Adición de la duradera bandera cuando se crea la cola, lo que hará que la cola para pegar alrededor, incluso si el agente se reinicia
  • Adición de la auto_delete marca, que hará que el intermediario elimine automáticamente la entidad después de un corto período de tiempo sin tener un consumidor adjunto

Si tiene control sobre el intermediario que está utilizando el agente de cuentas rabbitmq, una forma fácil de introspección de lo que está sucediendo en el intermediario es instalar el management plugin, que proporciona una interfaz web para intercambios, enlaces y colas en el intermediario.

0

En la primera se ven los bits AMQP parecen estar bien, pero no desee configurar todo el dependencias. Si proporciona un ejemplo mínimo con solo la parte de AMQP, lo verificaré.

+0

En mi código de Rails identifico la identificación de correo electrónico del usuario y el nombre de la habitación y la paso al código del servidor de AMQP a través de Websockets. Aunque el código parece funcionar cuando elimino durable => true no entiendo por qué eso afecta el mensaje que se está entregando. Ignora la parte de mongo ya que todavía no reproduce ninguna parte. También me gustaría saber si mi enfoque de AMQP y su uso es correcto – Sid

Cuestiones relacionadas