Estoy usando Pika para procesar datos de RabbitMQ. Como me parecía encontrar diferentes tipos de problemas, decidí escribir una pequeña aplicación de prueba para ver cómo puedo manejar las desconexiones.RabbitMQ, Pika y estrategia de reconexión
Escribí esta aplicación de prueba que no siguiente:
- Conectar a Broker, vuelva a intentar hasta que tenga éxito
- Cuando se conecta a crear una cola.
- Consuma esta cola y coloque el resultado en python Queue.Queue (0)
- Obtiene el elemento de Queue.Queue (0) y lo vuelve a generar en la cola del intermediario.
Lo que noté eran 2 números:
- Cuando ejecuto mi script de un host se conecta a RabbitMQ en otro host (dentro de una máquina virtual) entonces este guiones salidas en momentos aleatorios sin producir un error.
- Cuando ejecuto mi script en el mismo host en el que RabbitMQ está instalado, se ejecuta correctamente y sigue funcionando.
Esto podría explicarse debido a problemas de red, paquetes caídos aunque encuentro que la conexión no es realmente robusta.
Cuando el script se ejecuta localmente en el servidor RabbitMQ y me matan el RabbitMQ entonces el script finaliza con el error: "ERROR pika SelectConnection: error de socket en 3: 104"
lo que parece que no puedo conseguir la estrategia de reconexión funciona como debería ser. ¿Alguien podría echarle un vistazo al código para ver lo que estoy haciendo mal?
Gracias,
Jay
#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock
class Broker(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.logging = logging.getLogger(__name__)
self.to_broker = Queue.Queue(0)
self.from_broker = Queue.Queue(0)
self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
self.srs = SimpleReconnectionStrategy()
self.properties = pika.BasicProperties(delivery_mode=2)
self.connection = None
while True:
try:
self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs)
break
except Exception as err:
self.logging.warning('Cant connect. Reason: %s' % err)
time.sleep(1)
self.daemon=True
def run(self):
while True:
self.submitData(self.from_broker.get(block=True))
pass
def on_connected(self,connection):
connection.channel(self.on_channel_open)
def on_channel_open(self,new_channel):
self.channel = new_channel
self.channel.queue_declare(queue='sandbox', durable=True)
self.channel.basic_consume(self.processData, queue='sandbox')
def processData(self, ch, method, properties, body):
self.logging.info('Received data from broker')
self.channel.basic_ack(delivery_tag=method.delivery_tag)
self.from_broker.put(body)
def submitData(self,data):
self.logging.info('Submitting data to broker.')
self.channel.basic_publish(exchange='',
routing_key='sandbox',
body=data,
properties=self.properties)
if __name__ == '__main__':
format=('%(asctime)s %(levelname)s %(name)s %(message)s')
logging.basicConfig(level=logging.DEBUG, format=format)
broker=Broker()
broker.start()
try:
broker.connection.ioloop.start()
except Exception as err:
print err
Gracias por tomarse el tiempo de pasar por el código y encontrar todas las cuestiones relacionadas con ella. Actualmente estoy usando http://barryp.org/software/py-amqplib/ que es una biblioteca más básica/simple pero que se adapta a mis necesidades por completo. En combinación con gevent tengo algunos buenos resultados. Ya no me molesto más con Pika. –
puede usar Channel.confirm_delivery() para esperar el bloqueo después de publicado, una vez que se cierra la conexión, se agotó el tiempo de espera y sabrá que el mensaje no se entregó al agente –