2011-11-20 7 views
11

Dado que nadie proporcionó una solución a this post más el hecho de que necesito desesperadamente una solución alternativa, aquí está mi situación y algunas soluciones/ideas abstractas para el debate.Integración de apio tornado piratea

Mi pila:

  1. Tornado
  2. apio
  3. MongoDB
  4. Redis
  5. RabbitMQ

Mi problema: Encontrar una forma de tornado para despachar un apio tarea (resuelta) una d luego recoge asincrónicamente el resultado (¿alguna idea?)

Escenario 1: (petición/truco de respuesta más web hook)

  • Tornado recibe una solicitud (usuario), a continuación, guarda en la memoria local (o en Redis) a {de la Id: (usuario) solicitud} recordar dónde para propagar la respuesta, y dispara una tarea de apio con el de la Id
  • Cuando el apio completa la tarea, se realiza una web hook en algún url y le dice tornado que esto de la Id ha terminado (además de los resultados)
  • tornado recupera la (usuario) solicita y reenvía una respuesta al (usuario)

¿Puede suceder esto? ¿Tiene alguna lógica?

Escenario 2: (tornado más largo de votación)

  • Tornado envía la tarea de apio y devuelve algunos datos JSON primarios para el cliente (jQuery)
  • jQuery hace algo largo de votación a la recepción del json primario, digamos, cada x microsegundos, y el tornnado responde de acuerdo con alguna bandera de la base de datos. Cuando se completa la tarea de apio, esta bandera de la base de datos se establece en True, luego se completa el "bucle" de jQuery.

¿Es esto eficiente?

¿Alguna otra idea/esquema?

Respuesta

4

Me encontré con esta pregunta y presionar los resultados hacia atrás repetidamente no me pareció óptima. Así que implementé un Mixin similar a su Escenario 1 usando Sockets Unix.

Notifica a Tornado tan pronto como finaliza la tarea (para ser exacto, tan pronto como se ejecuta la siguiente tarea en la cadena) y solo resultados de retroceso una vez. Aquí está el link.

+0

¡Gran trabajo Eren! – hymloth

9

Mi solución consiste en el sondeo de tornado para el apio:

class CeleryHandler(tornado.web.RequestHandlerr): 

    @tornado.web.asynchronous 
    def get(self):  

     task = yourCeleryTask.delay(**kwargs) 

     def check_celery_task(): 
      if task.ready(): 
       self.write({'success':True}) 
       self.set_header("Content-Type", "application/json") 
       self.finish() 
      else: 
       tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

     tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

Aquí es post al respecto.

+0

¿podría volver a publicar el enlace del blog, que es tomado abajo! – vgoklani

+1

Editado para ser un enlace de archive.org – rbu

8

Aquí está nuestra solución al problema. Como buscamos resultados en varios manipuladores en nuestra aplicación, hicimos que la búsqueda de apio fuera mixin.

Esto también hace que el código sea más legible con el patrón tornado.gen.

from functools import partial 

class CeleryResultMixin(object): 
    """ 
    Adds a callback function which could wait for the result asynchronously 
    """ 
    def wait_for_result(self, task, callback): 
     if task.ready(): 
      callback(task.result) 
     else: 
      # TODO: Is this going to be too demanding on the result backend ? 
      # Probably there should be a timeout before each add_callback 
      tornado.ioloop.IOLoop.instance().add_callback(
       partial(self.wait_for_result, task, callback) 
      ) 


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler): 
    """Execute a task asynchronously over a celery worker. 
    Wait for the result without blocking 
    When the result is available send it back 
    """ 
    @tornado.web.asynchronous 
    @tornado.web.authenticated 
    @tornado.gen.engine 
    def post(self): 
     """Test the provided Magento connection 
     """ 
     task = expensive_task.delay(
      self.get_argument('somearg'), 
     ) 

     result = yield tornado.gen.Task(self.wait_for_result, task) 

     self.write({ 
      'success': True, 
      'result': result.some_value 
     }) 
     self.finish() 
3

Ahora, https://github.com/mher/tornado-celery viene a rescatar ...

class GenAsyncHandler(web.RequestHandler): 
    @asynchronous 
    @gen.coroutine 
    def get(self): 
     response = yield gen.Task(tasks.sleep.apply_async, args=[3]) 
     self.write(str(response.result)) 
     self.finish()