2012-02-29 19 views
12

Estoy usando Pika para procesar datos de RabbitMQ. Como me parecía encontrar diferentes tipos de problemas, decidí escribir una pequeña aplicación de prueba para ver cómo puedo manejar las desconexiones.RabbitMQ, Pika y estrategia de reconexión

Escribí esta aplicación de prueba que no siguiente:

  1. Conectar a Broker, vuelva a intentar hasta que tenga éxito
  2. Cuando se conecta a crear una cola.
  3. Consuma esta cola y coloque el resultado en python Queue.Queue (0)
  4. Obtiene el elemento de Queue.Queue (0) y lo vuelve a generar en la cola del intermediario.

Lo que noté eran 2 números:

  1. Cuando ejecuto mi script de un host se conecta a RabbitMQ en otro host (dentro de una máquina virtual) entonces este guiones salidas en momentos aleatorios sin producir un error.
  2. Cuando ejecuto mi script en el mismo host en el que RabbitMQ está instalado, se ejecuta correctamente y sigue funcionando.

Esto podría explicarse debido a problemas de red, paquetes caídos aunque encuentro que la conexión no es realmente robusta.

Cuando el script se ejecuta localmente en el servidor RabbitMQ y me matan el RabbitMQ entonces el script finaliza con el error: "ERROR pika SelectConnection: error de socket en 3: 104"

lo que parece que no puedo conseguir la estrategia de reconexión funciona como debería ser. ¿Alguien podría echarle un vistazo al código para ver lo que estoy haciendo mal?

Gracias,

Jay

#!/bin/python 
import logging 
import threading 
import Queue 
import pika 
from pika.reconnection_strategies import SimpleReconnectionStrategy 
from pika.adapters import SelectConnection 
import time 
from threading import Lock 

class Broker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.logging = logging.getLogger(__name__) 
     self.to_broker = Queue.Queue(0) 
     self.from_broker = Queue.Queue(0) 
     self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True) 
     self.srs = SimpleReconnectionStrategy() 
     self.properties = pika.BasicProperties(delivery_mode=2) 

     self.connection = None 
     while True: 
      try: 
       self.connection = SelectConnection(self.parameters, self.on_connected, reconnection_strategy=self.srs) 
       break 
      except Exception as err: 
       self.logging.warning('Cant connect. Reason: %s' % err) 
       time.sleep(1) 

     self.daemon=True 
    def run(self): 
     while True: 
      self.submitData(self.from_broker.get(block=True)) 
     pass 
    def on_connected(self,connection): 
     connection.channel(self.on_channel_open) 
    def on_channel_open(self,new_channel): 
     self.channel = new_channel 
     self.channel.queue_declare(queue='sandbox', durable=True) 
     self.channel.basic_consume(self.processData, queue='sandbox')  
    def processData(self, ch, method, properties, body): 
     self.logging.info('Received data from broker') 
     self.channel.basic_ack(delivery_tag=method.delivery_tag) 
     self.from_broker.put(body) 
    def submitData(self,data): 
     self.logging.info('Submitting data to broker.') 
     self.channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=self.properties) 
if __name__ == '__main__': 
    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 
    broker=Broker() 
    broker.start() 
    try: 
     broker.connection.ioloop.start() 
    except Exception as err: 
     print err 

Respuesta

17

El principal problema con el guión es que está interactuando con un solo canal de ambos, su hilo principal (donde el ioloop se está ejecutando) y el "Broker" hilo (llama a submitData en un bucle). Esto es not safe.

Además, SimpleReconnectionStrategy no parece hacer nada útil. No causa una reconexión si la conexión se interrumpe. Creo que esto es un error en Pika: https://github.com/pika/pika/issues/120

Intenté refacturar tu código para que funcione como creo que lo querías, pero se encontró con otro problema. Pika no parece tener una forma de detectar fallas en la entrega, lo que significa que los datos pueden perderse si la conexión se cae. ¡Este parece un requisito tan obvio! ¿Cómo no puede haber forma de detectar que basic_publish falló? Probé todo tipo de cosas, incluidas las transacciones y add_on_return_callback (todo lo cual parecía torpe y demasiado complicado), pero no se me ocurrió nada. Si realmente no hay forma, entonces pika solo parece ser útil en situaciones que pueden tolerar la pérdida de datos enviados a RabbitMQ, o en programas que solo necesitan consumir desde RabbitMQ.

Esto no es fiable, pero para referencia, aquí hay un código que resuelve su problema multi-hilo:

import logging 
import pika 
import Queue 
import sys 
import threading 
import time 
from functools import partial 
from pika.adapters import SelectConnection, BlockingConnection 
from pika.exceptions import AMQPConnectionError 
from pika.reconnection_strategies import SimpleReconnectionStrategy 

log = logging.getLogger(__name__) 

DEFAULT_PROPERTIES = pika.BasicProperties(delivery_mode=2) 


class Broker(object): 

    def __init__(self, parameters, on_channel_open, name='broker'): 
     self.parameters = parameters 
     self.on_channel_open = on_channel_open 
     self.name = name 

    def connect(self, forever=False): 
     name = self.name 
     while True: 
      try: 
       connection = SelectConnection(
        self.parameters, self.on_connected) 
       log.debug('%s connected', name) 
      except Exception: 
       if not forever: 
        raise 
       log.warning('%s cannot connect', name, exc_info=True) 
       time.sleep(10) 
       continue 

      try: 
       connection.ioloop.start() 
      finally: 
       try: 
        connection.close() 
        connection.ioloop.start() # allow connection to close 
       except Exception: 
        pass 

      if not forever: 
       break 

    def on_connected(self, connection): 
     connection.channel(self.on_channel_open) 


def setup_submitter(channel, data_queue, properties=DEFAULT_PROPERTIES): 
    def on_queue_declared(frame): 
     # PROBLEM pika does not appear to have a way to detect delivery 
     # failure, which means that data could be lost if the connection 
     # drops... 
     channel.confirm_delivery(on_delivered) 
     submit_data() 

    def on_delivered(frame): 
     if frame.method.NAME in ['Confirm.SelectOk', 'Basic.Ack']: 
      log.info('submission confirmed %r', frame) 
      # increasing this value seems to cause a higher failure rate 
      time.sleep(0) 
      submit_data() 
     else: 
      log.warn('submission failed: %r', frame) 
      #data_queue.put(...) 

    def submit_data(): 
     log.info('waiting on data queue') 
     data = data_queue.get() 
     log.info('got data to submit') 
     channel.basic_publish(exchange='', 
        routing_key='sandbox', 
        body=data, 
        properties=properties, 
        mandatory=True) 
     log.info('submitted data to broker') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


def blocking_submitter(parameters, data_queue, 
     properties=DEFAULT_PROPERTIES): 
    while True: 
     try: 
      connection = BlockingConnection(parameters) 
      channel = connection.channel() 
      channel.queue_declare(queue='sandbox', durable=True) 
     except Exception: 
      log.error('connection failure', exc_info=True) 
      time.sleep(1) 
      continue 
     while True: 
      log.info('waiting on data queue') 
      try: 
       data = data_queue.get(timeout=1) 
      except Queue.Empty: 
       try: 
        connection.process_data_events() 
       except AMQPConnectionError: 
        break 
       continue 
      log.info('got data to submit') 
      try: 
       channel.basic_publish(exchange='', 
          routing_key='sandbox', 
          body=data, 
          properties=properties, 
          mandatory=True) 
      except Exception: 
       log.error('submission failed', exc_info=True) 
       data_queue.put(data) 
       break 
      log.info('submitted data to broker') 


def setup_receiver(channel, data_queue): 
    def process_data(channel, method, properties, body): 
     log.info('received data from broker') 
     data_queue.put(body) 
     channel.basic_ack(delivery_tag=method.delivery_tag) 

    def on_queue_declared(frame): 
     channel.basic_consume(process_data, queue='sandbox') 

    channel.queue_declare(
     queue='sandbox', durable=True, callback=on_queue_declared) 


if __name__ == '__main__': 
    if len(sys.argv) != 2: 
     print 'usage: %s RABBITMQ_HOST' % sys.argv[0] 
     sys.exit() 

    format=('%(asctime)s %(levelname)s %(name)s %(message)s') 
    logging.basicConfig(level=logging.DEBUG, format=format) 

    host = sys.argv[1] 
    log.info('connecting to host: %s', host) 
    parameters = pika.ConnectionParameters(host=host, heartbeat=True) 
    data_queue = Queue.Queue(0) 
    data_queue.put('message') # prime the pump 

    # run submitter in a thread 

    setup = partial(setup_submitter, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'submitter') 
    thread = threading.Thread(target= 
     partial(broker.connect, forever=True)) 

    # uncomment these lines to use the blocking variant of the submitter 
    #thread = threading.Thread(target= 
    # partial(blocking_submitter, parameters, data_queue)) 

    thread.daemon = True 
    thread.start() 

    # run receiver in main thread 
    setup = partial(setup_receiver, data_queue=data_queue) 
    broker = Broker(parameters, setup, 'receiver') 
    broker.connect(forever=True) 
+0

Gracias por tomarse el tiempo de pasar por el código y encontrar todas las cuestiones relacionadas con ella. Actualmente estoy usando http://barryp.org/software/py-amqplib/ que es una biblioteca más básica/simple pero que se adapta a mis necesidades por completo. En combinación con gevent tengo algunos buenos resultados. Ya no me molesto más con Pika. –

+1

puede usar Channel.confirm_delivery() para esperar el bloqueo después de publicado, una vez que se cierra la conexión, se agotó el tiempo de espera y sabrá que el mensaje no se entregó al agente –

Cuestiones relacionadas