2012-08-23 10 views
6

Si esto es una pregunta idiota, me disculpo y saldrá a ocultar la cara de vergüenza, pero:Python/rq - el seguimiento de la condición de trabajador

estoy usando rq que hacer cola puestos de trabajo en Python. Quiero que funcione así:

  1. Empieza el trabajo A. Job A toma datos a través de la API web y los almacena.
  2. Se ejecuta el trabajo A.
  3. El trabajo A completa.
  4. Al finalizar A, se inicia el trabajo B. El trabajo B verifica cada registro almacenado por el trabajo A y agrega algunos datos de respuesta adicionales.
  5. Al finalizar el trabajo B, el usuario recibe un correo electrónico contento que dice que su informe está listo.

Mi código hasta ahora:

redis_conn = Redis() 
use_connection(redis_conn) 
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later 
w = Worker(q) 
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid) 
w.work() 

que supone mi mejor solución era tener 2 trabajadores, uno para el trabajo A y uno para B. El trabajador trabajo B podría supervisar el trabajo A y, cuando el trabajo A se realizó, comience en el trabajo B.

Lo que no puedo entender para salvar mi vida es cómo hago que un trabajador supervise el estado de otro. Puedo tomar la identificación del trabajo del trabajo A con job.id. Puedo tomar el nombre del trabajador con w.name. Pero no tengo ni la más mínima idea de cómo paso cualquiera de esa información al otro trabajador.

¿O hay una manera mucho más simple de hacer esto que me falta totalmente?

+1

Si el trabajo B no puede ejecutarse hasta que se complete el trabajo A (lo que implica que no se pueden ejecutar en paralelo), ¿por qué usar rq? Solo hazlos secuencialmente (en un hilo o proceso separado si no quieres bloquear tu aplicación) –

+0

Los trabajos para A y B llevan cada uno un tiempo muy largo, y pueden suceder por separado, así que me gustaría poder sigo corriendo un montón de trabajo A independientemente del trabajo B. Si es demasiado difícil, puedo rendirme. – user1066609

+0

¿Tiene pares de A y B que van juntos, o puede cualquier B depender de algún A? Porque en el último caso tienes un gran problema de sincronización. :-) –

Respuesta

0

Probablemente estés demasiado metido en tu proyecto para cambiar, pero si no, echa un vistazo a Twisted. http://twistedmatrix.com/trac/ Lo estoy usando ahora mismo para un proyecto que visita API, raspa el contenido web, etc. Ejecuta múltiples trabajos en paralelo, así como también organiza ciertos trabajos en orden, por lo que el Trabajo B no se ejecuta hasta que el Trabajo A finaliza.

Este es el mejor tutorial para aprender Twisted si quieres intentarlo. http://krondo.com/?page_id=1327

0

Combine las cosas que el trabajo A y el trabajo B hacen en una función, y luego use, p. multiprocessing.Pool (es map_async método) para cultivar eso en diferentes procesos.

No estoy familiarizado con rq, pero multiprocessing es parte de la biblioteca estándar. De forma predeterminada, utiliza tantos procesos como su CPU tiene núcleos, que en mi experiencia suele ser suficiente para saturar la máquina.

2

De this page en los documentos rq, parece que cada job objeto tiene un atributo result, exigible por job.result, que se puede comprobar. Si el trabajo no ha terminado, será None, pero si se asegura de que su trabajo arroje algún valor (incluso solo "Done"), entonces puede hacer que el otro trabajador verifique el resultado del primer trabajo y luego comience a trabajar solo cuando job.result tiene un valor, lo que significa que se completó el primer trabajador.

6

actualización de Enero de 2015, esta solicitud de extracción es ahora fusionada, y el parámetro se cambia el nombre a depends_on, es decir:

second_job = q.enqueue(email_customer, depends_on=first_job) 

El post original dejado intacto para las personas que utilicen versiones anteriores y tal:

He enviado una solicitud de extracción (https://github.com/nvie/rq/pull/207) para manejar dependencias de trabajos en RQ. Cuando esta solicitud de extracción se fusionó en, usted será capaz de hacer:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

first_job = q.enqueue(generate_report) 
second_job = q.enqueue(email_customer, after=first_job) 
# In the second enqueue call, job is created, 
# but only moved into queue after first_job finishes 

Por ahora, sugiero escribir una función de contenedor para ejecutar secuencialmente sus puestos de trabajo. Por ejemplo:

def generate_report(): 
    pass 

def email_customer(): 
    pass 

def generate_report_and_email(): 
    generate_report() 
    email_customer() # You can also enqueue this function, if you really want to 

# Somewhere else 
q.enqueue(generate_report_and_email) 
Cuestiones relacionadas