Soy nuevo en RabbitMQ. Quiero poder leer mensajes sin bloquear cuando hay varias colas (para leer). ¿Alguna información sobre cómo puedo hacer eso?Lectura de múltiples colas, RabbitMQ
// EDIT 1
public class Rabbit : IMessageBus
{
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
Subscription sub = null;
public void writeMessage(Measurement m1) {
byte[] body = Measurement.AltSerialize(m1);
int msgCount = 1;
Console.WriteLine("Sending message to queue {1} via the amq.direct exchange.", m1.id);
string finalQueue = publishToQueue(m1.id);
while (msgCount --> 0) {
channel.BasicPublish("amq.direct", finalQueue, null, body);
}
Console.WriteLine("Done. Wrote the message to queue {0}.\n", m1.id);
}
public string publishToQueue(string firstQueueName) {
Console.WriteLine("Creating a queue and binding it to amq.direct");
string queueName = channel.QueueDeclare(firstQueueName, true, false, false, null);
channel.QueueBind(queueName, "amq.direct", queueName, null);
Console.WriteLine("Done. Created queue {0} and bound it to amq.direct.\n", queueName);
return queueName;
}
public Measurement readMessage() {
Console.WriteLine("Receiving message...");
Measurement m = new Measurement();
int i = 0;
foreach (BasicDeliverEventArgs ev in sub) {
m = Measurement.AltDeSerialize(ev.Body);
//m.id = //get the id here, from sub
if (++i == 1)
break;
sub.Ack();
}
Console.WriteLine("Done.\n");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public Rabbit(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
//consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
~Rabbit()
{
//observer??
connection.Dispose();
//channel.Dispose();
System.Console.WriteLine("\nDestroying RABBIT");
}
}
// EDIT 2
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
foreach (BasicDeliverEventArgs ev in element) {
//ev = element.Next();
if(ev != null) {
m = Measurement.AltDeSerialize(ev.Body);
return m;
}
m = null;
}
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
// Editar 3
//MessageHandler.cs
public class MessageHandler
{
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();
ConnectionFactory factory = null;
IConnection connection = null;
IModel channel = null;
QueueingBasicConsumer consumer = null;
private List<Subscription> subscriptions = new List<Subscription>();
Subscription sub = null;
public void writeMessage (Measurement m1)
{
byte[] body = Measurement.AltSerialize(m1);
//declare a queue if it doesn't exist
publishToQueue(m1.id);
channel.BasicPublish("amq.direct", m1.id, null, body);
Console.WriteLine("\n [x] Sent to queue {0}.", m1.id);
}
public void publishToQueue(string queueName)
{
string finalQueueName = channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(finalQueueName, "amq.direct", "", null);
}
public Measurement readMessage()
{
Measurement m = new Measurement();
foreach(Subscription element in subscriptions)
{
if(element.QueueName == null)
{
m = null;
}
else
{
BasicDeliverEventArgs ev = element.Next();
if(ev != null) {
m = Measurement.AltDeSerialize(ev.Body);
m.id = element.QueueName;
element.Ack();
return m;
}
m = null;
}
element.Ack();
}
System.Console.WriteLine("No message in the queue(s) at this time.");
return m;
}
public void subscribeToQueue(string queueName)
{
sub = new Subscription(channel, queueName);
subscriptions.Add(sub);
}
public static string MsgSysName;
public string MsgSys
{
get
{
return MsgSysName;
}
set
{
MsgSysName = value;
}
}
public MessageHandler(string _msgSys) //Constructor
{
factory = new ConnectionFactory();
factory.HostName = "localhost";
connection = factory.CreateConnection();
channel = connection.CreateModel();
consumer = new QueueingBasicConsumer(channel);
System.Console.WriteLine("\nMsgSys: RabbitMQ");
MsgSys = _msgSys;
}
public void disposeAll()
{
connection.Dispose();
channel.Dispose();
foreach(Subscription element in subscriptions)
{
element.Close();
}
System.Console.WriteLine("\nDestroying RABBIT");
}
}
//App1.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
MessageHandler obj1 = MessageHandler("Rabbit");
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create new Measurement messages
Measurement m1 = new Measurement("q1", 2345, 23.456);
Measurement m2 = new Measurement("q2", 222, 33.33);
System.Console.WriteLine("Test message 1:\n ID: {0}", m1.id);
System.Console.WriteLine(" Time: {0}", m1.time);
System.Console.WriteLine(" Value: {0}", m1.value);
System.Console.WriteLine("Test message 2:\n ID: {0}", m2.id);
System.Console.WriteLine(" Time: {0}", m2.time);
System.Console.WriteLine(" Value: {0}", m2.value);
// Ask queue name and store it
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName = (System.Console.ReadLine()).ToString();
obj1.publishToQueue(queueName);
// Write message to the queue
obj1.writeMessage(m1);
System.Console.WriteLine("\nName of queue to publish to: ");
string queueName2 = (System.Console.ReadLine()).ToString();
obj1.publishToQueue(queueName2);
obj1.writeMessage(m2);
obj1.disposeAll();
}
}
//App2.cs
using System;
using System.IO;
using UtilityMeasurement;
using UtilityMessageBus;
public class MainClass
{
public static void Main()
{
//Asks for the message system
System.Console.WriteLine("\nEnter name of messageing system: ");
System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
string MsgSysName = (System.Console.ReadLine()).ToString();
//Declare an IMessageBus instance:
//Here, an object of the corresponding Message System
// (ex. Rabbit, Zmq, etc) is instantiated
IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);
System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);
//Create a new Measurement object m
Measurement m = new Measurement();
System.Console.WriteLine("Queue name to subscribe to: ");
string QueueName1 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue(QueueName1);
//Read message into m
m = obj1.readMessage();
if (m != null) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
System.Console.WriteLine("Another queue name to subscribe to: ");
string QueueName2 = (System.Console.ReadLine()).ToString();
obj1.subscribeToQueue(QueueName2);
m = obj1.readMessage();
if (m != null) {
System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}", m.id, m.id);
System.Console.WriteLine(" Time: {0}", m.time);
System.Console.WriteLine(" Value: {0}", m.value);
}
obj1.disposeAll();
}
}
Muchas gracias por su comentario. Todavía estoy aprendiendo el sistema de mensajes y hay operaciones que aún no entiendo. Como escuchar También he visto cómo rabbitmq se suscribe a una cola. ¿Puede suscribirse a múltiples colas usando la nueva Suscripción (canal, nombre de cola)? Y si es así, ¿cómo puedo pasar por todas las colas suscritas y devolver un mensaje (nulo cuando no hay mensajes)? Ah, y recuerda que tengo todas estas operaciones en diferentes métodos. Editaré mi publicación para reflejar el código. – Demi
Gracias de nuevo. Edité el código para las funciones de suscripción y escritura anteriores. Sin embargo, tengo este error de tiempo de ejecución: si me suscribo para decir dos colas y trato de leer mensajes, solo puedo recuperar mensajes por primera vez. No puedo ver dónde lo arruiné. ¿Puedes echar un vistazo a si para mí? – Demi
@Demi ... eso tomó un poco de caza. Creo que te faltan "suscripciones. ¿Te acuestas()" al final de tu ciclo de lectura? Lo que significa 'He procesado este mensaje con éxito, así que dame el siguiente'. Avísame si fue eso. De lo contrario, te ves cerca. – sgtz