2011-07-14 86 views
6

Soy nuevo en RabbitMQ. Quiero poder leer mensajes sin bloquear cuando hay varias colas (para leer). ¿Alguna información sobre cómo puedo hacer eso?Lectura de múltiples colas, RabbitMQ

// EDIT 1

public class Rabbit : IMessageBus 
{ 

    private List<string> publishQ = new List<string>(); 
    private List<string> subscribeQ = new List<string>(); 

    ConnectionFactory factory = null; 
    IConnection connection = null; 
    IModel channel = null; 
    Subscription sub = null; 

    public void writeMessage(Measurement m1) { 
     byte[] body = Measurement.AltSerialize(m1); 
     int msgCount = 1; 
     Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id); 

     string finalQueue = publishToQueue(m1.id); 

     while (msgCount --> 0) { 
      channel.BasicPublish("amq.direct", finalQueue, null, body); 
     } 

     Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id); 
    } 

    public string publishToQueue(string firstQueueName) { 
     Console.WriteLine("Creating a queue and binding it to amq.direct"); 
     string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null); 
     channel.QueueBind(queueName, "amq.direct", queueName, null); 
     Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName); 
     return queueName; 
    } 


    public Measurement readMessage() { 
     Console.WriteLine("Receiving message..."); 
     Measurement m = new Measurement(); 

     int i = 0; 
     foreach (BasicDeliverEventArgs ev in sub) { 
      m = Measurement.AltDeSerialize(ev.Body); 
      //m.id = //get the id here, from sub 
      if (++i == 1) 
       break; 
      sub.Ack(); 
     } 

     Console.WriteLine("Done.\n"); 
     return m; 
    } 


    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
    } 

    public static string MsgSysName; 
    public string MsgSys 
    { 
     get 
     { 
      return MsgSysName; 
     } 
     set 
     { 
      MsgSysName = value; 
     } 
    } 

    public Rabbit(string _msgSys) //Constructor 
    { 
     factory = new ConnectionFactory(); 
     factory.HostName = "localhost"; 
     connection = factory.CreateConnection(); 
     channel = connection.CreateModel(); 
     //consumer = new QueueingBasicConsumer(channel); 

     System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
     MsgSys = _msgSys; 
    } 

    ~Rabbit() 
    { 
     //observer?? 
     connection.Dispose(); 
     //channel.Dispose(); 
     System.Console.WriteLine("\nDestroying RABBIT"); 
    } 
} 

// EDIT 2

private List<Subscription> subscriptions = new List<Subscription>(); 
    Subscription sub = null; 

public Measurement readMessage() 
    { 
     Measurement m = new Measurement(); 
     foreach(Subscription element in subscriptions) 
     { 
      foreach (BasicDeliverEventArgs ev in element) { 
       //ev = element.Next(); 
       if(ev != null) { 
        m = Measurement.AltDeSerialize(ev.Body); 
        return m; 
       } 
       m = null; 
      }   
     } 
     System.Console.WriteLine("No message in the queue(s) at this time."); 
     return m; 
    } 

    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
     subscriptions.Add(sub);  
    } 

// Editar 3

//MessageHandler.cs 

public class MessageHandler 
{ 
    // Implementation of methods for Rabbit class go here 
    private List<string> publishQ = new List<string>(); 
    private List<string> subscribeQ = new List<string>(); 

    ConnectionFactory factory = null; 
    IConnection connection = null; 
    IModel channel = null; 
    QueueingBasicConsumer consumer = null; 

    private List<Subscription> subscriptions = new List<Subscription>(); 
    Subscription sub = null; 

    public void writeMessage (Measurement m1) 
    { 
     byte[] body = Measurement.AltSerialize(m1); 
     //declare a queue if it doesn't exist 
     publishToQueue(m1.id); 

     channel.BasicPublish("amq.direct", m1.id, null, body); 
     Console.WriteLine("\n [x] Sent to queue {0}.", m1.id); 
    } 

    public void publishToQueue(string queueName) 
    { 
     string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null); 
     channel.QueueBind(finalQueueName, "amq.direct", "", null); 
    } 

    public Measurement readMessage() 
    { 
     Measurement m = new Measurement(); 
     foreach(Subscription element in subscriptions) 
     { 
      if(element.QueueName == null) 
      { 
       m = null; 
      } 
      else 
      { 
       BasicDeliverEventArgs ev = element.Next(); 
       if(ev != null) { 
        m = Measurement.AltDeSerialize(ev.Body); 
        m.id = element.QueueName; 
        element.Ack(); 
        return m; 
       } 
       m = null;      
      } 
      element.Ack(); 
     } 
     System.Console.WriteLine("No message in the queue(s) at this time."); 
     return m; 
    } 

    public void subscribeToQueue(string queueName) 
    { 
     sub = new Subscription(channel, queueName); 
     subscriptions.Add(sub); 
    } 

    public static string MsgSysName; 
    public string MsgSys 
    { 
     get 
     { 
      return MsgSysName; 
     } 
     set 
     { 
      MsgSysName = value; 
     } 
    } 

    public MessageHandler(string _msgSys) //Constructor 
    { 
     factory = new ConnectionFactory(); 
     factory.HostName = "localhost"; 
     connection = factory.CreateConnection(); 
     channel = connection.CreateModel(); 
     consumer = new QueueingBasicConsumer(channel); 

     System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
     MsgSys = _msgSys; 
    } 

    public void disposeAll() 
    { 
     connection.Dispose(); 
     channel.Dispose(); 
     foreach(Subscription element in subscriptions) 
     { 
      element.Close(); 
     } 
     System.Console.WriteLine("\nDestroying RABBIT"); 
    } 
} 

//App1.cs

using System; 
using System.IO; 

using UtilityMeasurement; 
using UtilityMessageBus; 


public class MainClass 
{ 
    public static void Main() 
    { 

    MessageHandler obj1 = MessageHandler("Rabbit"); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    //Create new Measurement messages 
    Measurement m1 = new Measurement("q1", 2345, 23.456); 
    Measurement m2 = new Measurement("q2", 222, 33.33); 

    System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id); 
    System.Console.WriteLine(" Time: {0}", m1.time); 
    System.Console.WriteLine(" Value: {0}", m1.value); 

    System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id); 
    System.Console.WriteLine(" Time: {0}", m2.time); 
    System.Console.WriteLine(" Value: {0}", m2.value); 

    // Ask queue name and store it 
    System.Console.WriteLine("\nName of queue to publish to: "); 
    string queueName = (System.Console.ReadLine()).ToString(); 
    obj1.publishToQueue(queueName); 

    // Write message to the queue 
    obj1.writeMessage(m1);  

    System.Console.WriteLine("\nName of queue to publish to: "); 
    string queueName2 = (System.Console.ReadLine()).ToString(); 
    obj1.publishToQueue(queueName2); 

    obj1.writeMessage(m2); 

    obj1.disposeAll(); 
} 
} 

//App2.cs

using System; 
using System.IO; 

using UtilityMeasurement; 
using UtilityMessageBus; 

public class MainClass 
{ 
    public static void Main() 
    { 
    //Asks for the message system 
    System.Console.WriteLine("\nEnter name of messageing system: "); 
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); 
    string MsgSysName = (System.Console.ReadLine()).ToString(); 

    //Declare an IMessageBus instance: 
    //Here, an object of the corresponding Message System 
     // (ex. Rabbit, Zmq, etc) is instantiated 
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    //Create a new Measurement object m 
    Measurement m = new Measurement(); 

    System.Console.WriteLine("Queue name to subscribe to: "); 
    string QueueName1 = (System.Console.ReadLine()).ToString(); 
    obj1.subscribeToQueue(QueueName1); 

    //Read message into m 
    m = obj1.readMessage(); 

    if (m != null) { 
     System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); 
     System.Console.WriteLine(" Time: {0}", m.time); 
     System.Console.WriteLine(" Value: {0}", m.value); 
    } 

    System.Console.WriteLine("Another queue name to subscribe to: "); 
    string QueueName2 = (System.Console.ReadLine()).ToString(); 
    obj1.subscribeToQueue(QueueName2); 

    m = obj1.readMessage(); 

    if (m != null) { 
     System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id); 
     System.Console.WriteLine(" Time: {0}", m.time); 
     System.Console.WriteLine(" Value: {0}", m.value); 
    } 

    obj1.disposeAll(); 
} 
} 

Respuesta

12

dos fuentes de información:

  1. http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

  2. Usted realmente debe tratar de entender los ejemplos primero.

    • % Archivos de programa% \ RabbitMQ \ DotNetClient \ ejemplos \ src (ejemplos básicos)

    • llegar Con ejemplos completos de su repositorio de Mercurial (C# proyectos).

operaciones útil para comprender:

  • Declarar/Assert/Escuchar/Subscribe/Publicar

Re: su pregunta - no hay razón por la que puede' t tener múltiples escuchas O puede suscribirse a n rutas de enrutamiento con un solo escuchador en un "intercambio".

** re: sin bloqueo **

Un listenner típico consume mensajes de uno en uno. Puede sacarlos de la cola, o se colocarán automáticamente cerca del consumidor en forma de 'ventana' (definida a través de los parámetros de calidad de servicio qos). La belleza del enfoque es que se realiza mucho trabajo duro para usted (por ejemplo, confiabilidad, entrega garantizada, etc.).

Una característica clave de RabbitMQ es que si hay un error en el procesamiento, el mensaje se vuelve a agregar a la cola (una característica de tolerancia a errores).

Necesita saber más acerca de su situación.

A menudo, si publicas en la lista que mencioné anteriormente, puedes conseguir a alguien en el personal de RabbitMQ. Ellos son muy útiles.

Espero que ayude un poco. Al principio es mucho para entender, pero vale la pena persistir.


Q & Un

véase: http://www.rabbitmq.com/faq.html

P. ¿Puede suscribirse a varias colas utilizando la nueva suscripción (canal, queuename)?

Sí. O bien usa una clave de enlace, p. abc. *. hij, o abc. #. hij, o puede adjuntar enlaces múltiples. El primero asume que ha diseñado sus claves de enrutamiento en torno a algún tipo de principio que tenga sentido para usted (consulte las claves de enrutamiento en las preguntas frecuentes). Para este último, debe vincular a más de una cola.

Implementando n-bindings manualmente. ver: http://hg.rabbitmq.com/rabbitmq-dotnet-client/file/default/projects/client/RabbitMQ.Client/src/client/messagepatterns/Subscription.cs

no hay mucho código detrás de este patrón, por lo que podría rodar su propio patrón de suscripción si los comodines no son suficientes. podría heredar de esta clase y agregar otro método para enlaces adicionales ... probablemente esto funcionará o algo así (no probado).

spec El AQMP dice que son posibles de unión manual de múltiples: http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.bind

Q. And if so, how can I go through all the subscribed queues and return a message (null when no messages)?

con un abonado se le notifica cuando un mensaje está disponible. De lo contrario, lo que estás describiendo es una interfaz de extracción donde tiras el mensaje cuando lo solicitas. Si no hay mensajes disponibles, obtendrá un nulo como desee. Por cierto: el método Notify es probablemente más conveniente.

Q. Oh, and mind you that that I have all this operations in different methods. I will edit my post to reflect the code

código en vivo:

esta versión debe utilizar comodines para suscribirse a más de un enrutamiento clave

n manual de claves de enrutamiento utilizando la suscripción se deja como ejercicio para el lector. ;-) Creo que te inclinabas hacia una interfaz pull de todos modos. Por cierto, las interfaces de extracción son menos eficientes que las de notificación.

 using (Subscription sub = new Subscription(ch, QueueNme)) 
     { 
      foreach (BasicDeliverEventArgs ev in sub) 
      { 
       Process(ev.Body); 

     ... 

Nota: el foreach utiliza IEnumerable, y IEnumerable envuelve el caso de que un nuevo mensaje ha llegado a través del "rendimiento" comunicado. Efectivamente es un ciclo infinito.

--- ACTUALIZACIÓN

AMQP fue diseñado con la idea de mantener el número de conexiones TCP precio tan bajo como el número de solicitudes, por lo que significa que puede tener muchos canales por conexión.

el código en esta pregunta (edición 3) intenta utilizar dos suscriptores con un canal, mientras que debería (creo), ser un suscriptor por canal por hilo para evitar problemas de bloqueo. Sugestión: use una clave de enrutamiento "comodín". Es posible suscribirse a más de un nombre de cola distinto con el cliente java, pero el cliente .net no tiene este conocimiento implementado en la clase de ayuda del Suscriptor.

Si realmente necesita dos nombres de cola distintos en la misma cadena de suscripción, se sugiere la siguiente secuencia de extracción.net:

 using (IModel ch = conn.CreateModel()) { // btw: no reason to close the channel afterwards IMO 
      conn.AutoClose = true;     // no reason to closs the connection either. Here for completeness. 

      ch.QueueDeclare(queueName); 
      BasicGetResult result = ch.BasicGet(queueName, false); 
      if (result == null) { 
       Console.WriteLine("No message available."); 
      } else { 
       ch.BasicAck(result.DeliveryTag, false); 
       Console.WriteLine("Message:"); 
      } 

      return 0; 
     } 

- ACTUALIZACIÓN 2:.

de la lista RabbitMQ:

"asumen que element.Next() es el bloqueo en una de las suscripciones Usted podría recuperar las entregas de cada suscripción con un tiempo de espera de , léalo. Alternativamente, puede configurar una sola cola para recibir todas las mediciones y recuperar mensajes de ella con una sola suscripción. " (Emile)

Lo que eso significa es que cuando la primera cola está vacía, .Next() bloquea esperando a que aparezca el siguiente mensaje. es decir, el abonado tiene una espera para la próxima mensaje construido en

- ACTUALIZACIÓN 3:.

bajo NET, utilice la QueueingBasicConsumer para el consumo a partir de múltiples colas.

En realidad aquí es un hilo de ello para tener una idea sobre el uso:

Wait for a single RabbitMQ message with a timeout

- Update4:

algo más de información sobre la .QueueingBasicConsumer

Hay código de ejemplo aquí.

http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.4.0/rabbitmq-dotnet-client-1.4.0-net-2.0-htmldoc/type-RabbitMQ.Client.QueueingBasicConsumer.html

ejemplo copiado en la respuesta con algunas modificaciones (ver // < -----).

   IModel channel = ...; 
      QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
      channel.BasicConsume(queueName, false, null, consumer); //<----- 
      channel.BasicConsume(queueName2, false, null, consumer); //<----- 
      // etc. channel.BasicConsume(queueNameN, false, null, consumer); //<----- 

      // At this point, messages will be being asynchronously delivered, 
      // and will be queueing up in consumer.Queue. 

      while (true) { 
       try { 
        BasicDeliverEventArgs e = (BasicDeliverEventArgs) consumer.Queue.Dequeue(); 
        // ... handle the delivery ... 
        channel.BasicAck(e.DeliveryTag, false); 
       } catch (EndOfStreamException ex) { 
        // The consumer was cancelled, the model closed, or the 
        // connection went away. 
        break; 
       } 
      } 

- ACTUALIZACIÓN 5: un encuentro simple que actuará sobre cualquier cola (un método más lento, pero a veces más conveniente).

  ch.QueueDeclare(queueName); 
      BasicGetResult result = ch.BasicGet(queueName, false); 
      if (result == null) { 
       Console.WriteLine("No message available."); 
      } else { 
       ch.BasicAck(result.DeliveryTag, false); 
       Console.WriteLine("Message:"); 
       // deserialize body and display extra info here. 
      } 
+0

Muchas gracias por su comentario. Todavía estoy aprendiendo el sistema de mensajes y hay operaciones que aún no entiendo. Como escuchar También he visto cómo rabbitmq se suscribe a una cola. ¿Puede suscribirse a múltiples colas usando la nueva Suscripción (canal, nombre de cola)? Y si es así, ¿cómo puedo pasar por todas las colas suscritas y devolver un mensaje (nulo cuando no hay mensajes)? Ah, y recuerda que tengo todas estas operaciones en diferentes métodos. Editaré mi publicación para reflejar el código. – Demi

+0

Gracias de nuevo. Edité el código para las funciones de suscripción y escritura anteriores. Sin embargo, tengo este error de tiempo de ejecución: si me suscribo para decir dos colas y trato de leer mensajes, solo puedo recuperar mensajes por primera vez. No puedo ver dónde lo arruiné. ¿Puedes echar un vistazo a si para mí? – Demi

+0

@Demi ... eso tomó un poco de caza. Creo que te faltan "suscripciones. ¿Te acuestas()" al final de tu ciclo de lectura? Lo que significa 'He procesado este mensaje con éxito, así que dame el siguiente'. Avísame si fue eso. De lo contrario, te ves cerca. – sgtz

1

La forma más sencilla es utilizar el EventingBasicConsumer. Tengo un ejemplo en mi sitio sobre cómo usarlo. RabbitMQ EventingBasicConsumer

Esta clase de consumidor expone un evento recibido que puede utilizar y, por lo tanto, NO bloquea. El resto del código básicamente permanece igual.

Cuestiones relacionadas