2010-08-04 10 views
9

Tengo un número de subprocesos que esperan en un evento, realizan alguna acción, luego esperan en el evento nuevamente. Otro hilo activará el evento cuando sea apropiado.Python threading.Event() - Asegurando que todos los subprocesos en espera se activen en event.set()

No puedo encontrar la forma de garantizar que cada hebra en espera se active exactamente una vez sobre el evento que se está configurando. Actualmente tengo el hilo desencadenante configurado, dormir un poco, luego despejarlo. Desafortunadamente, esto lleva a que los hilos de espera agarren el evento establecido muchas veces, o ninguno.

No puedo simplemente hacer que el hilo desencadenante engendre los subprocesos de respuesta para ejecutarlos una vez porque son respuestas a solicitudes realizadas desde otros lugares.

En resumen: en Python, ¿cómo puedo hacer que un hilo establezca un evento y asegurar que cada hilo en espera actúa sobre el evento exactamente una vez antes de que se borre?

Actualización:

He intentado configurarlo usando una cerradura y una cola, pero no funciona. Aquí es lo que tengo:

# Globals - used to synch threads 
waitingOnEvent = Queue.Queue 
MainEvent = threading.Event() 
MainEvent.clear() # Not sure this is necessary, but figured I'd be safe 
mainLock = threading.Lock() 

def waitCall(): 
    mainLock.acquire() 
    waitingOnEvent.put("waiting") 
    mainLock.release() 
    MainEvent.wait() 
    waitingOnEvent.get(False) 
    waitingOnEvent.task_done() 
    #do stuff 
    return 

def triggerCall(): 
    mainLock.acquire() 
    itemsinq = waitingOnEvent.qsize() 
    MainEvent.set() 
    waitingOnEvent.join() 
    MainEvent.clear() 
    mainLock.release() 
    return 

La primera vez, itemsinq refleja correctamente el número de llamadas están a la espera, pero sólo el primer hilo de espera para realizar la llamada va a salir adelante. Desde entonces, itemsinq siempre es 1, y los hilos de espera se turnan; cada vez que ocurre la llamada de activación, la siguiente pasa.

Actualización 2 Parece como si sólo una de la event.wait() hilos está despertando , y sin embargo el queue.join) está trabajando (. Esto me sugiere que un hilo de espera se despierta, toma de la cola y llama a task_done(), y que get()/task_done() solo vacía la cola y permite el join(). El hilo desencadenante luego completa el join(), borra el evento y así evita que los otros hilos en espera pasen. Sin embargo, ¿por qué la cola se registraría como vacía/terminada después de una sola llamada get/task_done?

Parece que solo uno está despertando, incluso si hago un comentario en queue.get() y queue.task_done() y cuelgo el desencadenador para que no pueda despejar el evento.

+0

Esto suena como un posible defecto de diseño, probablemente haya una mejor manera de lograr lo que está tratando de lograr. ¿Puedes publicar más detalles sobre lo que se supone que deben hacer los hilos? –

Respuesta

8

No necesita un evento, y no necesita tanto un bloqueo como una cola. Todo lo que necesitas es una cola.

Llame al queue.put para dejar un mensaje sin esperar a que se entregue o se procese.

Llame al queue.get en el subproceso de trabajo para esperar a que llegue un mensaje.

import threading 
import Queue 

active_queues = [] 

class Worker(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.mailbox = Queue.Queue() 
     active_queues.append(self.mailbox) 

    def run(self): 
     while True: 
      data = self.mailbox.get() 
      if data == 'shutdown': 
       print self, 'shutting down' 
       return 
      print self, 'received a message:', data 

    def stop(self): 
     active_queues.remove(self.mailbox) 
     self.mailbox.put("shutdown") 
     self.join() 


def broadcast_event(data): 
    for q in active_queues: 
     q.put(data) 

t1 = Worker() 
t2 = Worker() 
t1.start() 
t2.start() 
broadcast_event("first event") 
broadcast_event("second event") 
broadcast_event("shutdown") 

t1.stop() 
t2.stop() 

Los mensajes no tienen que ser cadenas; pueden ser cualquier objeto Python.

+0

Esto supone que hay un número estático de hilos en espera, todo definido al comienzo, que no es el caso. Los hilos de espera son generados por las solicitudes, por lo que el número es dinámico y desconocido. El bloqueo estaba destinado a evitar que los hilos de espera adicionales se agreguen, vean el conjunto de eventos, se eliminen y se realicen, especialmente dado que esto podría, teóricamente, evitar que la cola se vacíe. – culhantr

+0

@culhantr OK, reescribí el ejemplo. –

1

No soy un programador de Python pero si un evento solo puede procesarse una vez quizás necesite cambiar a una cola de mensajes con el bloqueo apropiado para que cuando un hilo se despierte y reciba el mensaje de evento lo procese y quítelo de la cola para que no esté allí si otros subprocesos se despiertan y miran en la cola.

2

Una solución que he usado en el pasado es la clase Queue para la comunicación interthread. Es threadsafe y se puede usar para facilitar la comunicación entre subprocesos al usar las bibliotecas de multiprocesamiento y subprocesamiento. Puede hacer que los subprocesos hijo esperen que algo entre en la cola y luego procesar la nueva entrada. La clase Queue también tiene un método get() que toma un práctico argumento de bloqueo.

3

Si desea eventos atómicos discretos que puedan procesarse secuencialmente por cada hilo, haga lo siguiente krs1 & bot403 y utilice una cola. La clase Python Queue está sincronizada; no tiene que preocuparse por el bloqueo para usarla.

Sin embargo, si sus necesidades son más simples (el evento le dice que tiene datos disponibles para leer, etc.), puede suscribir/registrar sus hilos como observadores de un objeto responsable de desencadenar los eventos.Este objeto mantendría la lista de objetos del observador threading.Event. En un desencadenador, puede llamar a set() en todos los objetos threading.Event en la lista.

Cuestiones relacionadas