2011-12-11 10 views
8

estoy usando Redis junto con mi solicitud Tornado con el cliente ASYC Brukva, cuando miraba a las aplicaciones de ejemplo en Brukva sitio que están haciendo nueva conexión en el método "init" en WebSocket¿Cuál es la forma correcta de manejar la conexión Redis en Tornado? (Asíncrono - Pub/Sub)

class MessagesCatcher(tornado.websocket.WebSocketHandler): 
    def __init__(self, *args, **kwargs): 
     super(MessagesCatcher, self).__init__(*args, **kwargs) 
     self.client = brukva.Client() 
     self.client.connect() 
     self.client.subscribe('test_channel') 

    def open(self): 
     self.client.listen(self.on_message) 

    def on_message(self, result): 
     self.write_message(str(result.body)) 

    def close(self): 
     self.client.unsubscribe('test_channel') 
     self.client.disconnect() 

está bien en el caso de websocket pero cómo manejarlo en el método de publicación Tornado RequestHandler común dice operación de sondeo larga (modelo de publicación-suscripción). Estoy haciendo una nueva conexión de cliente en cada método de publicación de controlador de actualización ¿este es el enfoque correcto? Cuando revisé en la consola redis veo que los clientes aumentan en cada nueva operación de publicación.

enter image description here

Aquí es una muestra de mi código.

c = brukva.Client(host = '127.0.0.1') 
c.connect() 

class MessageNewHandler(BaseHandler): 
    @tornado.web.authenticated 
    def post(self): 

     self.listing_id = self.get_argument("listing_id") 
     message = { 
      "id": str(uuid.uuid4()), 
      "from": str(self.get_secure_cookie("username")), 
      "body": str(self.get_argument("body")), 
     } 
     message["html"] = self.render_string("message.html", message=message) 

     if self.get_argument("next", None): 
      self.redirect(self.get_argument("next")) 
     else: 
      c.publish(self.listing_id, message) 
      logging.info("Writing message : " + json.dumps(message)) 
      self.write(json.dumps(message)) 

    class MessageUpdatesHandler(BaseHandler): 
     @tornado.web.authenticated 
     @tornado.web.asynchronous 
     def post(self): 
      self.listing_id = self.get_argument("listing_id", None) 
      self.client = brukva.Client() 
      self.client.connect() 
      self.client.subscribe(self.listing_id) 
      self.client.listen(self.on_new_messages) 

     def on_new_messages(self, messages): 
      # Closed client connection 
      if self.request.connection.stream.closed(): 
       return 
      logging.info("Getting update : " + json.dumps(messages.body)) 
      self.finish(json.dumps(messages.body)) 
      self.client.unsubscribe(self.listing_id) 


     def on_connection_close(self): 
      # unsubscribe user from channel 
      self.client.unsubscribe(self.listing_id) 
      self.client.disconnect() 

aprecio si usted proporciona un código de ejemplo para el caso similar.

+0

PubSub asincrónico en Python usando Redis, ZMQ, Tornado - https://github.com/abhinavsingh/async_pubsub –

Respuesta

2

debe agrupar las conexiones en su aplicación. ya que parece que brukva no admite esto automáticamente (redis-py lo admite, pero está bloqueando por naturaleza, por lo que no funciona bien con tornado), debe escribir su propio grupo de conexiones.

el patrón es bastante simple, sin embargo. algo en este sentido (este no es el código de operación real):

class BrukvaPool(): 

    __conns = {} 


    def get(host, port,db): 
     ''' Get a client for host, port, db ''' 

     key = "%s:%s:%s" % (host, port, db) 

     conns = self.__conns.get(key, []) 
     if conns: 
      ret = conns.pop() 
      return ret 
     else: 
      ## Init brukva client here and connect it 

    def release(client): 
     ''' release a client at the end of a request ''' 
     key = "%s:%s:%s" % (client.connection.host, client.connection.port, client.connection.db) 
     self.__conns.setdefault(key, []).append(client) 

puede ser un poco más complejo, pero esa es la idea principal.

9

Un poco tarde pero, he estado usando tornado-redis. Funciona con ioloop del tornado y el tornado.gen módulo

Instalar tornadoredis

que se puede instalar desde pip

pip install tornadoredis 

o con setuptools

easy_install tornadoredis 

pero que realmente no debería Haz eso. También puedes clonar el repositorio y extraerlo. A continuación, ejecute

python setup.py build 
python setup.py install 

Conectar a Redis

El siguiente código va en su main.py o equivalente

redis_conn = tornadoredis.Client('hostname', 'port') 
redis_conn.connect() 

redis.connect se llama sólo una vez. Es una llamada de bloqueo, por lo que debe llamarse antes de iniciar el ioloop principal. El mismo objeto de conexión se comparte entre todos los controladores.

Se podría añadir a tus parámetros de la aplicación como

settings = { 
    redis = redis_conn 
} 
app = tornado.web.Application([('/.*', Handler),], 
           **settings) 

Uso tornadoredis

La conexión se puede utilizar en los manipuladores en self.settings['redis'] o se puede añadir como una propiedad de la BaseHandler y subclase esa clase para otros manejadores de solicitudes.

class BaseHandler(tornado.web.RequestHandler): 

    @property 
    def redis(): 
     return self.settings['redis'] 

Para comunicarse con Redis, se utilizan los tornado.web.asynchronous y los tornado.gen.engine decoradores

class SomeHandler(BaseHandler): 

    @tornado.web.asynchronous 
    @tornado.gen.engine 
    def get(self): 
     foo = yield gen.Task(self.redis.get, 'foo') 
     self.render('sometemplate.html', {'foo': foo} 

información adicional

Más ejemplos y otras características como la agrupación de conexiones y tuberías se pueden encontrar en la github repo.

Cuestiones relacionadas