2010-12-14 29 views
23

¿Es posible enviar un mensaje a través de RabbitMQ con un poco de retraso? Por ejemplo, quiero caducar la sesión del cliente después de 30 minutos, y le envío un mensaje que se procesará después de 30 minutos.Mensaje demorado en RabbitMQ

+0

¿Es necesario utilizar RabbitMQ? –

+1

Sí, esta función está disponible desde RabbitMQ-3.5.8. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/ – lambodar

+0

Si usa Spring AMQP, hay [soporte para el complemento] (https://docs.spring.io/spring-amqp/reference/htmlsingle/# delayed-message-exchange). – Gruber

Respuesta

5

Actualmente no es posible. Debe almacenar sus marcas de tiempo de caducidad en una base de datos o algo similar, y luego tener un programa auxiliar que lea esas marcas de tiempo y ponga un mensaje en cola.

Los mensajes demorados son una función que a menudo se solicita, ya que son útiles en muchas situaciones. Sin embargo, si su necesidad es caducar las sesiones del cliente, creo que la mensajería no es la solución ideal para usted, y que otro enfoque podría funcionar mejor.

9

Con el lanzamiento de RabbitMQ v2.8, entrega programada ya está disponible, sino como una función indirecta: http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

+0

Intenté este enfoque, pero acerté algunos problemas, ¿alguna sugerencia? http://blog.james-carr.org/2012/03/30/rabbitmq-sending-a-message-to-be-consumed-later/#comment-502703 –

+5

Hice una espiga y acerté algunas cosas importantes: 1. Los mensajes solo son DLQ: es cuando está en la parte superior de la Q (http://www.rabbitmq.com/ttl.html - sección Advertencias) Esto significa que si configuro primero el mensaje 1 para que expire en 4 horas y msg2 caducará en 1 hora msg2 solo caducará una vez que msg1 haya expirado. 2. El TTL para el mensaje es guardado por Rabbit, así que digamos que usa un corto tiempo de espera de 10 s. Si el consumidor no ha podido consumir el mensaje dentro de 10 segundos después de su vencimiento (debido a un retraso acumulado), se descartará y perderá Lo anterior se ha verificado con Rabbit 3.0.1 ¿Vieron alguna solución? ? –

6

Como no tengo la reputación suficiente para añadir comentarios, la publicación de una nueva respuesta. Esto es solo una adición a lo que ya se ha discutido en http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Excepto que en lugar de establecer ttl en los mensajes, puede configurarlo en el nivel de la cola. También puede evitar crear un nuevo intercambio solo por el bien de redirigir los mensajes a diferentes Queue. A continuación se muestra el código de Java:

Productor:

import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
import java.util.HashMap; 
import java.util.Map; 

public class DelayedProducer { 
    private final static String QUEUE_NAME = "ParkingQueue"; 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory connectionFactory = new ConnectionFactory(); 
     connectionFactory.setHost("localhost"); 
     Connection connection = connectionFactory.newConnection(); 
     Channel channel = connection.createChannel(); 

     Map<String, Object> arguments = new HashMap<String, Object>(); 
     arguments.put("x-message-ttl", 10000); 
     arguments.put("x-dead-letter-exchange", ""); 
     arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME); 
     channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); 

     for (int i=0; i<5; i++) { 
      String message = "This is a sample message " + i; 
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
      System.out.println("message "+i+" got published to the queue!"); 
      Thread.sleep(3000); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

Consumidor:

import com.rabbitmq.client.ConnectionFactory; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.QueueingConsumer; 

public class Consumer { 
    private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; 

    public static void main(String[] args) throws Exception{ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     channel.queueDeclare(QUEUE_NAME, false, false, false, null); 
     System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     boolean autoAck = false; 
     channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); 

     while (true) { 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      System.out.println(" [x] Received '" + message + "'"); 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 
     } 

    } 
} 
+0

Muchas gracias. Creo que tienes un pequeño error en la cola del consumidor que declara channel.queueDeclare (QUEUE_NAME, false, false, false, null); Debería tener "DESTINATION_QUEUE_NAME" en lugar de "QUEUE_NAME". Realmente muchas gracias –

5

Parece que this blog post describe el uso de la bolsa de papel mojado y TTL mensaje a hacer algo similar.

El siguiente código usa CoffeeScript y Node.JS para acceder a Rabbit e implementar algo similar.

amqp = require 'amqp' 
events = require 'events' 
em  = new events.EventEmitter() 
conn = amqp.createConnection() 

key = "send.later.#{new Date().getTime()}" 
conn.on 'ready', -> 
    conn.queue key, { 
    arguments:{ 
     "x-dead-letter-exchange":"immediate" 
    , "x-message-ttl": 5000 
    , "x-expires": 6000 
    } 
    }, -> 
    conn.publish key, {v:1}, {contentType:'application/json'} 

    conn.exchange 'immediate' 

    conn.queue 'right.now.queue', { 
     autoDelete: false 
    , durable: true 
    }, (q) -> 
    q.bind('immediate', 'right.now.queue') 
    q.subscribe (msg, headers, deliveryInfo) -> 
     console.log msg 
     console.log headers 
0

Supongamos que tenemos control sobre el consumidor, que podría lograr el retraso en el consumidor como esto ??:

Si estamos seguros de que el mensaje de orden n en la cola siempre tiene un retraso menor que el n + 1er mensaje (esto puede ser cierto para muchos casos de uso): el productor envía timeInformation en la tarea que transporta el tiempo en el que este trabajo debe ejecutarse (currentTime + delay). El consumidor:

1) Lee el scheduledTime de la tarea

2) si horaActual> scheduledTime adelante.

retraso Else = scheduledTime - horaActual

sueño durante el tiempo indicado por retraso

El consumidor siempre está configurado con un parámetro de concurrencia. Entonces, los demás mensajes esperarán en la cola hasta que el consumidor finalice el trabajo. Por lo tanto, esta solución podría funcionar bien, aunque se ve incómoda, especialmente por los grandes retrasos.

5

Gracias a la respuesta de Norman, pude implementarlo en NodeJS.

Todo está bastante claro desde el código. Espero que le ahorre tiempo a alguien.

var ch = channel; 
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false}); 
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false}); 

// setup intermediate queue which will never be listened. 
// all messages are TTLed so when they are "dead", they come to another exchange 
ch.assertQueue("my_intermediate_queue", { 
     deadLetterExchange: "my_final_delayed_exchange", 
     messageTtl: 5000, // 5sec 
}, function (err, q) { 
     ch.bindQueue(q.queue, "my_intermediate_exchange", ''); 
}); 

ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { 
     ch.bindQueue(q.queue, "my_final_delayed_exchange", ''); 

     ch.consume(q.queue, function (msg) { 
      console.log("delayed - [x] %s", msg.content.toString()); 
     }, {noAck: true}); 
}); 
4

Hay dos enfoques que puede probar:

viejo enfoque: establecer el TTL (tiempo de vida) de cabecera en cada mensaje/cola (política) y luego introducir un DLQ para manejarlo. una vez que el ttl expiró, sus mensajes pasarán de DLQ a cola principal para que su oyente pueda procesarlo.

enfoque más reciente: Recientemente RabbitMQ se le ocurrió RabbitMQ mensaje retrasado Plugin, mediante el cual se puede lograr lo mismo y este soporte para plugins disponibles desde RabbitMQ-3.5.8.

Puede declarar un intercambio con el tipo x-mensaje retrasado y luego publicar mensajes con el encabezado personalizado x-delay expresando en milisegundos el tiempo de demora para el mensaje. El mensaje será entregado a las respectivas colas después de milisegundos de retardo x-

Más aquí: git