2010-05-10 23 views
6

Me gustaría enviar un mensaje a un servidor RabbitMQ y luego esperar un mensaje de respuesta (en una cola de "respuesta a"). Por supuesto, no quiero esperar para siempre en caso de que la aplicación que está procesando estos mensajes no funcione, debe haber un tiempo de espera. Parece una tarea muy básica, pero no puedo encontrar la forma de hacerlo. Ahora me he encontrado con este problema tanto con py-amqplib como con RabbitMQ .NET client.Espere un solo mensaje RabbitMQ con un tiempo de espera

La mejor solución que tengo hasta ahora es para sondear utilizando basic_get con sleep en el medio, pero esto es bastante feo:

def _wait_for_message_with_timeout(channel, queue_name, timeout): 
    slept = 0 
    sleep_interval = 0.1 

    while slept < timeout: 
     reply = channel.basic_get(queue_name) 
     if reply is not None: 
      return reply 

     time.sleep(sleep_interval) 
     slept += sleep_interval 

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout) 

seguramente hay alguna manera mejor?

Respuesta

8

Acabo de agregar soporte de tiempo de espera para amqplib en carrot.

Esta es una subclase de amqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi es una versión de channel.wait capaz de recibir en un número arbitrario de canales.

Supongo que esto podría combinarse en sentido ascendente en algún punto.

+1

Ahora esto es lo que llamo una "gran respuesta": "¡está arreglado!" Aceptar - con la esperanza de que * se * fusione en amqplib. – EMP

+0

@EMP jaja :) divertido :) –

1

Esto parece romper la idea de procesamiento asincrónico, pero si debe, creo que la forma correcta de hacerlo es usar RpcClient.

+0

Mientras que en sí rpcclient no es útil para mí, mirando a su puesta en práctica revela el enfoque de usar: crear un 'QueueingBasicConsumer' y espere en su cola, que admite un tiempo de espera. Esto no es tan complejo en .NET como yo temía. – EMP

2

Hay un ejemplo here usando qpid con un msg = q.get(timeout=1) que debe hacer lo que quiera. Lo siento, no sé qué otras bibliotecas de clientes de AMQP implementan tiempos de espera (y, en particular, no conozco los dos específicos que mencionaste).

+0

Al observar el origen de qpid, parece utilizar el mismo enfoque que el cliente .NET: 'basic_consume' con una cola y esperando en la cola con un tiempo de espera agotado. Parece que eso es lo que tendré que hacer. – EMP

8

Esto es lo que terminé haciendo en el cliente .NET:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) 
{ 
    var consumer = new QueueingBasicConsumer(Channel); 
    var tag = Channel.BasicConsume(queueName, true, null, consumer); 
    try 
    { 
     object result; 
     if (!consumer.Queue.Dequeue(timeoutMs, out result)) 
      throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs/1000.0)); 

     return ((BasicDeliverEventArgs)result).Body; 
    } 
    finally 
    { 
     Channel.BasicCancel(tag); 
    } 
} 

Por desgracia, no puedo hacer lo mismo con py-amqplib, debido a su método basic_consume no llama a la devolución de llamada a menos que llame channel.wait() y channel.wait() no admite tiempos de espera Esta tonta limitación (que sigo encontrando) significa que si nunca recibes otro mensaje, tu hilo queda congelado para siempre.

1

Rabbit ahora le permite agregar eventos de tiempo de espera. Simplemente envolver su código en un intento de captura y luego lanzar excepciones en el tiempo de espera y desconecte los manipuladores:

try{ 
    using (IModel channel = rabbitConnection.connection.CreateModel()) 
    { 
     client = new SimpleRpcClient(channel, "", "", queue); 
     client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity 
     client.TimedOut += RpcTimedOutHandler; 
     client.Disconnected += RpcDisconnectedHandler; 
     byte[] replyMessageBytes = client.Call(message); 
     return replyMessageBytes; 
    } 
} 
catch (Exception){ 
    //Handle timeout and disconnect here 
} 
private void RpcDisconnectedHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC disconnect exception occured."); 
} 

private void RpcTimedOutHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC timeout exception occured."); 
} 
Cuestiones relacionadas