2011-06-23 10 views
7

Soy nuevo en HornetQ así que por favor tengan paciencia conmigo. Déjame que te diga primero mis requisitos:Mensajes de HornetQ que aún permanecen en la cola después de consumir usando la API central

Necesito un middleware de mensajes en cola que pueda pasar mensajes, de aproximadamente 1k de tamaño, entre diferentes procesos con baja latencia y persistencia (es decir, debería sobrevivir a bloqueos del sistema). Tendría múltiples procesos escribiendo en las mismas colas y, de manera similar, múltiples lecturas de proceso desde la misma cola.

Para esto elegí HornetQ ya que tiene la mejor calificación para el envío de mensajes con persistencia.

Actualmente estoy usung Hornetq v2.2.2Final como stand alone servidor.
soy capaz de crear con éxito colas duraderos/no duraderos utilizando núcleo api(ClientSession), y con éxito de enviar mensajes a la cola (ClientProducer).
De forma similar, puedo leer los mensajes de la cola usando la API central (Cliente).

El problema viene después de esto cuando el cliente ha leído el mensaje, el mensaje todavía permanece en la cola, es decir el número de mensajes en la cola se mantiene constante. Tal vez estoy obteniendo este error pero estaba bajo esta impresión de que una vez que se consume el mensaje (leer + ack), se elimina de la cola. Pero esto no está sucediendo en mi caso, y los mismos mensajes están siendo leer una y otra vez

Además, me gustaría decir que he intentado utilizar colas no duraderas con mensajes no duraderos. pero el problema sigue siendo.

Código para el productor que estoy usando:

public class HQProducer implements Runnable { 

    private ClientProducer producer; 
    private boolean killme; 
    private ClientSession session; 
    private boolean durableMsg; 

    public HQProducer(String host, int port, String address, String queueName, 
      boolean deleteQ, boolean durable, boolean durableMsg, int pRate) { 
     this.durableMsg = durableMsg; 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      if (queueExists(queueName)) { 
       if (deleteQ) { 
        System.out.println("Deleting existing queue :: " + queueName); 
        session.deleteQueue(queueName); 
        System.out.println("Creating queue :: " + queueName); 
        session.createQueue(address, queueName, true); 
       } 
      } else { 
       System.out.println("Creating new queue :: " + queueName); 
       session.createQueue(address, queueName, durable); 
      } 
      producer = session.createProducer(SimpleString.toSimpleString(address), pRate); 

      killme = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killme) { 
      try { 
       ClientMessage message = session.createMessage(durableMsg); 

       message.getBodyBuffer().writeString("Hello world"); 

       producer.send(message); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("Producer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killme) { 
     this.killme = killme; 
    } 

    private boolean queueExists(String qname) { 
     boolean res = false; 
     try { 
      //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname)); 
      QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname)); 
      if (queueQuery.isExists()) { 
       res = true; 
      } 
     } catch (HornetQException ex) { 
      res = false; 
     } 
     return res; 
    } 
} 

también el código para los consumidores es: configuración del servidor

public class HQConsumer implements Runnable { 

    private ClientSession session; 
    private ClientConsumer consumer; 
    private boolean killMe; 

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) { 
     try { 
      HashMap map = new HashMap(); 
      map.put("host", host); 
      map.put("port", port); 

      TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map); 

      ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config); 

      ClientSessionFactory factory = locator.createSessionFactory(); 

      session = factory.createSession(); 

      session.start(); 

      consumer = session.createConsumer(queueName, "",0,-1,browseOnly); 

      killMe = false; 
     } catch (Exception ex) { 
      Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    @Override 
    public void run() { 
     long time = System.currentTimeMillis(); 
     int cnt = 0; 
     long timediff; 
     while (!killMe) { 
      try { 
       ClientMessage msgReceived = consumer.receive(); 
       msgReceived.acknowledge(); 
       //System.out.println("message = " + msgReceived.getBodyBuffer().readString()); 
       cnt++; 
       timediff = ((System.currentTimeMillis() - time)/1000); 
       if (timediff >= 1) { 
        System.out.println("ConSumer tps :: " + cnt); 
        cnt = 0; 
        time = System.currentTimeMillis(); 
       } 
      } catch (HornetQException ex) { 
       Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } 
     try { 
      session.close(); 
     } catch (HornetQException ex) { 
      Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 

    public void setKillMe(boolean killMe) { 
     this.killMe = killMe; 
    } 
} 

HornetQ ::

<configuration xmlns="urn:hornetq" 
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
       xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd"> 

    <paging-directory>${data.dir:../data}/paging</paging-directory> 

    <bindings-directory>${data.dir:../data}/bindings</bindings-directory> 

    <journal-directory>${data.dir:../data}/journal</journal-directory> 

    <journal-min-files>10</journal-min-files> 

    <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory> 

    <connectors> 
     <connector name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </connector> 

     <connector name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     </connector> 
    </connectors> 

    <acceptors> 
     <acceptor name="netty"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.port:5445}"/> 
     </acceptor> 

     <acceptor name="netty-throughput"> 
     <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class> 
     <param key="host" value="${hornetq.remoting.netty.host:localhost}"/> 
     <param key="port" value="${hornetq.remoting.netty.batch.port:5455}"/> 
     <param key="batch-delay" value="50"/> 
     <param key="direct-deliver" value="false"/> 
     </acceptor> 
    </acceptors> 

    <security-settings> 
     <security-setting match="#"> 
     <permission type="createNonDurableQueue" roles="guest"/> 
     <permission type="deleteNonDurableQueue" roles="guest"/> 
     <permission type="createDurableQueue" roles="guest"/> 
     <permission type="deleteDurableQueue" roles="guest"/> 
     <permission type="consume" roles="guest"/> 
     <permission type="send" roles="guest"/> 
     </security-setting> 
    </security-settings> 

    <address-settings> 
     <!--default for catch all--> 
     <address-setting match="#"> 
     <dead-letter-address>jms.queue.DLQ</dead-letter-address> 
     <expiry-address>jms.queue.ExpiryQueue</expiry-address> 
     <redelivery-delay>0</redelivery-delay> 
     <max-size-bytes>10485760</max-size-bytes>  
     <message-counter-history-day-limit>10</message-counter-history-day-limit> 
     <address-full-policy>BLOCK</address-full-policy> 
     </address-setting> 
    </address-settings> 

</configuration> 
+0

a/c para [esto] (http: // docs.jboss.org/hornetq/2.2.2.Final/user-manual/en/html/messaging-concepts.html#d0e354) debe confirmar el mensaje después de procesar ¿está haciendo lo mismo? –

Respuesta

13

con la API del núcleo hornetq tienes que ackear un mensaje explícitamente. No veo dónde está pasando eso en tu prueba.

Si no está registrando, esta es la razón por la cual sus mensajes están bloqueando. Necesitaría ver su ejemplo completo para darle una respuesta completa.

también: Debe definir su createSession con: createSession (cierto, cierto, 0)

La API de núcleo tiene una opción para ACK por lotes. No está utilizando una sesión transaccionada, por lo que no enviará acks al servidor hasta que llegue al ackBatchSize configurado en su servidorLocator. Con esto en su lugar, cualquier ack se enviará al servidor tan pronto como llame a acknowledge() en su mensaje.

La opción que está utilizando actualmente es equivalente a JMS DUPS_OK ​​con un cierto DUPS_SIZE.

(Mensaje editado mi respuesta inicial después de alguna iteración con usted)

+1

'ClientMessage msgReceived = consumer.receive(); msgReceived.acknowledge(); '.. Estoy confirmando el código –

+0

La API principal tiene una opción para procesar por lotes ACK. No está utilizando una sesión transaccionada, por lo que no enviará acks al servidor hasta que llegue al ackBatchSize configurado en su servidorLocator. Debe definir su createSession con: createSession (true, true, 0); Con esto en su lugar, cualquier confirmación se enviará al servidor tan pronto como llame a acknowledge() en su mensaje –

+1

No ha vuelto a este hilo. ¿Supongo que solucionas tu problema? –

2

Ajuste del ackbatchsize me ayudó a solucionar el problema .. Gracias por la ayuda

+2

probablemente deberías votar en una de las respuestas aquí. –

Cuestiones relacionadas