Soy nuevo en HornetQ así que por favor tengan paciencia conmigo. Déjame que te diga primero mis requisitos:Mensajes de HornetQ que aún permanecen en la cola después de consumir usando la API central
Necesito un middleware de mensajes en cola que pueda pasar mensajes, de aproximadamente 1k de tamaño, entre diferentes procesos con baja latencia y persistencia (es decir, debería sobrevivir a bloqueos del sistema). Tendría múltiples procesos escribiendo en las mismas colas y, de manera similar, múltiples lecturas de proceso desde la misma cola.
Para esto elegí HornetQ ya que tiene la mejor calificación para el envío de mensajes con persistencia.
Actualmente estoy usung Hornetq v2.2.2Final como stand alone servidor.
soy capaz de crear con éxito colas duraderos/no duraderos utilizando núcleo api(ClientSession), y con éxito de enviar mensajes a la cola (ClientProducer).
De forma similar, puedo leer los mensajes de la cola usando la API central (Cliente).
El problema viene después de esto cuando el cliente ha leído el mensaje, el mensaje todavía permanece en la cola, es decir el número de mensajes en la cola se mantiene constante. Tal vez estoy obteniendo este error pero estaba bajo esta impresión de que una vez que se consume el mensaje (leer + ack), se elimina de la cola. Pero esto no está sucediendo en mi caso, y los mismos mensajes están siendo leer una y otra vez
Además, me gustaría decir que he intentado utilizar colas no duraderas con mensajes no duraderos. pero el problema sigue siendo.
Código para el productor que estoy usando:
public class HQProducer implements Runnable {
private ClientProducer producer;
private boolean killme;
private ClientSession session;
private boolean durableMsg;
public HQProducer(String host, int port, String address, String queueName,
boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
this.durableMsg = durableMsg;
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
if (queueExists(queueName)) {
if (deleteQ) {
System.out.println("Deleting existing queue :: " + queueName);
session.deleteQueue(queueName);
System.out.println("Creating queue :: " + queueName);
session.createQueue(address, queueName, true);
}
} else {
System.out.println("Creating new queue :: " + queueName);
session.createQueue(address, queueName, durable);
}
producer = session.createProducer(SimpleString.toSimpleString(address), pRate);
killme = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killme) {
try {
ClientMessage message = session.createMessage(durableMsg);
message.getBodyBuffer().writeString("Hello world");
producer.send(message);
cnt++;
timediff = ((System.currentTimeMillis() - time)/1000);
if (timediff >= 1) {
System.out.println("Producer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killme) {
this.killme = killme;
}
private boolean queueExists(String qname) {
boolean res = false;
try {
//ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
if (queueQuery.isExists()) {
res = true;
}
} catch (HornetQException ex) {
res = false;
}
return res;
}
}
también el código para los consumidores es: configuración del servidor
public class HQConsumer implements Runnable {
private ClientSession session;
private ClientConsumer consumer;
private boolean killMe;
public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
try {
HashMap map = new HashMap();
map.put("host", host);
map.put("port", port);
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);
ClientSessionFactory factory = locator.createSessionFactory();
session = factory.createSession();
session.start();
consumer = session.createConsumer(queueName, "",0,-1,browseOnly);
killMe = false;
} catch (Exception ex) {
Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
@Override
public void run() {
long time = System.currentTimeMillis();
int cnt = 0;
long timediff;
while (!killMe) {
try {
ClientMessage msgReceived = consumer.receive();
msgReceived.acknowledge();
//System.out.println("message = " + msgReceived.getBodyBuffer().readString());
cnt++;
timediff = ((System.currentTimeMillis() - time)/1000);
if (timediff >= 1) {
System.out.println("ConSumer tps :: " + cnt);
cnt = 0;
time = System.currentTimeMillis();
}
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
try {
session.close();
} catch (HornetQException ex) {
Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
public void setKillMe(boolean killMe) {
this.killMe = killMe;
}
}
HornetQ ::
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<paging-directory>${data.dir:../data}/paging</paging-directory>
<bindings-directory>${data.dir:../data}/bindings</bindings-directory>
<journal-directory>${data.dir:../data}/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>
<connectors>
<connector name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</connector>
<connector name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
</connector>
</connectors>
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.port:5445}"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="${hornetq.remoting.netty.host:localhost}"/>
<param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
<permission type="createDurableQueue" roles="guest"/>
<permission type="deleteDurableQueue" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>10485760</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>BLOCK</address-full-policy>
</address-setting>
</address-settings>
</configuration>
a/c para [esto] (http: // docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354) debe confirmar el mensaje después de procesar ¿está haciendo lo mismo? –