2011-08-15 24 views
32

Parece que cuanto más tiempo tengo mi servidor rabbitmq funcionando, más problemas tengo con los mensajes no confirmados. Me encantaría ponerlos nuevamente. De hecho, parece que hay un comando amqp para hacer esto, pero solo se aplica al canal que usa su conexión. Construí un pequeño script pika a al menos probarlo, pero cualquiera que me falta algo o que no se puede hacer de esta manera (¿qué tal con rabbitmqctl?)¿Cómo puedo recuperar mensajes AMQP no reconocidos de otros canales que no sean los de mi conexión?

import pika 

credentials = pika.PlainCredentials('***', '***') 
parameters = pika.ConnectionParameters(host='localhost',port=5672,\ 
    credentials=credentials, virtual_host='***') 

def handle_delivery(body): 
    """Called when we receive a message from RabbitMQ""" 
    print body 

def on_connected(connection): 
    """Called when we are fully connected to RabbitMQ""" 
    connection.channel(on_channel_open)  

def on_channel_open(new_channel): 
    """Called when our channel has opened""" 
    global channel 
    channel = new_channel 
    channel.basic_recover(callback=handle_delivery,requeue=True)  

try: 
    connection = pika.SelectConnection(parameters=parameters,\ 
     on_open_callback=on_connected)  

    # Loop so we can communicate with RabbitMQ 
    connection.ioloop.start() 
except KeyboardInterrupt: 
    # Gracefully close the connection 
    connection.close() 
    # Loop until we're fully closed, will stop on its own 
    connection.ioloop.start() 
+0

¿Ha sido capaz de resolver esto? – 13hsoj

+0

https://stackoverflow.com/questions/8296201/when-does-an-amqp-rabbitmq-channel-with-no-connections -die SO la respuesta tiene potencialmente lo que se necesita dependiendo de por qué usted tiene otros canales todavía dando vueltas con mensajes no contestados. Zombie channels No dup, ya que este tema trata de mensajes en otros canales, y no en los canales en sí. –

Respuesta

45

Mensajes no confirmados son aquellos que se han entregado a través de la red a un consumidor pero aún no ha sido rechazado o rechazado, pero ese consumidor aún no ha cerrado el canal o la conexión sobre el que originalmente los recibió. Por lo tanto, el agente no puede determinar si el consumidor simplemente está tardando mucho tiempo en procesar esos mensajes o si se olvidó de ellos. Por lo tanto, los deja en un estado no reconocido hasta que el consumidor muere o son rechazados o rechazados.

Dado que esos mensajes todavía podrían ser procesados ​​válidamente en el futuro por el consumidor aún vivo que los consumió originalmente, no puede (a mi conocimiento) insertar otro consumidor en la mezcla y tratar de tomar decisiones externas sobre ellos. Debe corregir a sus consumidores para que tomen decisiones sobre cada mensaje a medida que se procesan en lugar de dejar los mensajes antiguos sin acuse de recibo.

+0

para que el consumidor llame al básico.recover _? estoy usando apio para administrar conexiones. podría ser posible enviar ese comando de recuperación a colas poco receptivas con apiocelio (si está familiarizado con eso ...) –

+3

@ Mis condolencias a que esté usando Apio. Los desarrolladores de apio simplemente no entienden AMQP y han creado una implementación mal rota. Tienes que elegir, deshacerte del apio y hacerlo bien con AMQP, o dejar de usar AMQP con apio y usar algo simple como Redis. Elegí dejar caer el apio y quedarme con AMQP. –

+6

eso es toda una acusación. Si no le importa que pregunte, ¿qué pasa con la implementación de AMQP del apio que no se ejecuta correctamente? –

10

Si los mensajes son unacked sólo hay dos maneras de conseguir de nuevo en la cola:

  1. basic.nack

    Este comando hará que el mensaje para volver a colocarse en la cola y volver a ser entregado.

  2. Desconectar en el corredor de

    Esta acción forzará todos los mensajes unacked de este canal para volver a ponerse en la cola.

NOTA: basic.recover intentará volver a publicar mensajes unacked en el mismo canal (con el mismo consumidor), que a veces es el comportamiento deseado.

RabbitMQ spec for basic.recover and basic.nack


La verdadera pregunta es: ¿Por qué son los mensajes no reconocida?

escenarios posibles:

  1. Consumidor ir a buscar demasiados mensajes, entonces no procesando y acking con la suficiente rapidez.

    Solución: Recupere la menor cantidad posible de mensajes.

  2. biblioteca cliente Buggy (tengo este problema actualmente con pika 0.9.13. Si la cola tiene una gran cantidad de mensajes, un cierto número de mensajes se queda bloqueado unacked, incluso horas después.

    Solución : Tengo que reiniciar el consumidor varias veces hasta que todos los mensajes no procesados ​​desaparezcan de la cola.

+0

¿Se ha informado acerca de su problema con pika? ¿Puedes proporcionar un enlace? – istepaniuk

+0

Es un límite de recursión python que se inició. Algo sobre no poder recurse> 1000 veces, que es lo que aparentemente estaba sucediendo con pika 0.9.13. No lo veo con 0.9.14. – IvanD

+3

Finalmente encontrado donde se informó el problema: https://github.com/pika/pika/issues/286 – IvanD

2

Todos los mensajes no confirmados pasarán al estado listo una vez que se detengan todos los trabajadores/consumidores.

Asegúrese de detener a todos los trabajadores confirmando con grep en la salida ps aux y deteniéndolos/eliminándolos si se encuentran.

Si está administrando trabajadores que usan el supervisor, que se muestra como el trabajador está detenido, es posible que desee comprobar si hay zombies. El supervisor informa que el trabajador debe detenerse, pero de todos modos encontrará que los procesos zombies se ejecutan cuando se copian en ps aux output. Matar a los procesos zombies llevará los mensajes a un estado listo.

+0

También puede determinar si una conexión de conejo está siendo retenida por un proceso zombie, usando la consola de administración de RabbitMQ, como Describí aquí: http://stackoverflow.com/questions/11926077/rabbitmq-messages-remain-unacknowledged/43026774#43026774 –

Cuestiones relacionadas