2012-01-15 13 views
5

Tengo un sistema que implementa Camel y ActiveMQ para la comunicación entre algunos servidores. Me gustaría saber si hay una forma de caducar y borrar automáticamente los mensajes enviados a una cola después de X período de tiempo. Dado que el servidor de origen (rellenando la cola) no sabrá si alguien está recogiendo los mensajes, no quiero que mi cola crezca hasta que sea tan grande que algo se cuelgue. El karma de bonificación apunta a alguien que puede ayudar y proporciona la forma de Java DSL para implementar esta característica.Expiración automática de mensajes en Camel

Solución

// expire message after 2 minutes 
long ttl = System.currentTimeMillis() + 120000; 
// send our info one-way to the group topic 
camelTemplate.sendBodyAndHeader("jms:queue:stats", ExchangePattern.InOnly, stats, "JMSExpiration", ttl); 

Respuesta

1

Bueno, la setJMSExpiration (espiración larga):

es algo que no debe llama cuando eres el cliente Ver mi charla sobre esto un poco en el foro ActiveMQ.

http://apache-qpid-developers.2158895.n2.nabble.com/MRG-Java-JMS-Expiration-td7171854.html

+0

En su mensaje en el foro, parecía que deseaba una caducidad de 10 minutos, pero en su lugar había establecido 10 segundos. ex. (10 * (60 * 1000)) vs (10 * 1000) –

+0

@Mondain, no, quería 10 segundos. 10 minutos es una configuración predeterminada de Qpid de cuando los mensajes caducados se borran de las colas. Por ejemplo: usted envió un mensaje a la cola con vencimiento establecido en 10 segundos, por lo que después de 10 segundos QPid expiró los mensajes y ningún consumidor puede consumirlo, pero de facto el mensaje es eliminar bu QPid cuando se retira la Política de Remoción (s) patear, y la configuración predeterminada es que se activan después de 10 minutos – Eugene

+0

Ok, entendí mal lo que había publicado allí. –

3

también la mente que los relojes entre el cliente - corredor necesitan para estar en sintonía, para el vencimiento para que funcione correctamente. Si los relojes no están sincronizados, es posible que el conjunto de caducidad del cliente ya haya expirado cuando se reciba el mensaje en el intermediario. O el tiempo del cliente está por delante del intermediario, por lo que el vencimiento es más largo que los 10 segundos.

Me suena un poco por qué el vencimiento se basa en el tiempo del cliente. Entonces, AMQ ofrece un complemento, para arreglar esto al realinear el tiempo, para ser solo un agente. Ver http://activemq.apache.org/timestampplugin.html

1

De nuestra parte elegimos agregar tiempo de caducidad a destinos específicos mediante el uso de una ruta en camello implementada en el servicio ActiveMQ mismo.

Lo único que debe hacer es crear un archivo XML como el siguiente con el nombre, p. Ej. setJMSExpiration.xml:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <camelContext id="camel-set-expiration" xmlns="http://camel.apache.org/schema/spring"> 
    <!-- Copy route for each destination to expire --> 
    <route id="setJMSExpiration.my.queue.dlq"> 
     <from uri="broker:queue:MY.QUEUE.DLQ"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 1 day --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 86400000}</spel> 
     </setHeader> 
     <to uri="broker:queue:MY.QUEUE.DLQ"/> 
    </route> 
    <route id="setJMSExpiration.another.queue"> 
     <from uri="broker:queue:ANOTHER.QUEUE"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 5 days --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 432000000}</spel> 
     </setHeader> 
     <to uri="broker:queue:ANOTHER.QUEUE"/> 
    </route> 
    </camelContext> 
</beans> 

e importarlo en su configuración con activemq.xml:

<!-- Add default Expiration (file in the same directory) --> 
<import resource="setJMSExpiration.xml"/> 

Alternativamente, también puede proporcionar una información precisa per destination policies si no desea que los mensajes caducados para llegar a la cola ActiveMQ.DLQ.

<policyEntry queue="MY.QUEUE.DLQ"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 
<policyEntry queue="ANOTHER.QUEUE"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 

La única limitación de esta manera es que no se puede utilizar fácilmente como comodines que se codifica aquí (se puede, pero lo necesitará algunas adaptaciones mediante el uso de cabecera destino JMS en la ruta de camello).

Tratamos de dejar que los productores definan timeToLive (y forzarlos tanto como sea posible) pero no siempre es posible forzarlos a cambiar su código, esto para minimizar el número de tales rutas.