2012-09-07 16 views
15

Si tengo dos objetos threading.Event() y deseo dormir hasta que se establezca uno de ellos, ¿hay alguna forma eficiente de hacerlo en Python? Claramente, podría hacer algo con sondeos/tiempos de espera, pero me gustaría que el hilo duerma hasta que se establezca uno, similar a cómo se usa select para los descriptores de archivos.Enhebrado de Python: ¿puedo dormir en dos threading.Event() s simultáneamente?

Entonces, en la siguiente implementación, ¿cómo sería una implementación eficiente no de sondeo de wait_for_either?

a = threading.Event() 
b = threading.Event() 

wait_for_either(a, b) 
+0

¿Hay alguna buena razón para usar 2 eventos diferentes y no usar el mismo? –

+0

@Iulius tiene un único hilo que desea que sea controlado por un evento, pero tiene 2 colas ... por lo que debe activarse cuando q cualquiera obtiene un elemento – pyInTheSky

+0

Me sorprende que Python no lo tenga incorporado. –

Respuesta

18

Aquí es una solución de rosca no sondeo no excesiva: modificar los Event s existentes para disparar una devolución de llamada siempre que cambien, y manejar el establecimiento de un nuevo evento en que la devolución de llamada:

import threading 

def or_set(self): 
    self._set() 
    self.changed() 

def or_clear(self): 
    self._clear() 
    self.changed() 

def orify(e, changed_callback): 
    e._set = e.set 
    e._clear = e.clear 
    e.changed = changed_callback 
    e.set = lambda: or_set(e) 
    e.clear = lambda: or_clear(e) 

def OrEvent(*events): 
    or_event = threading.Event() 
    def changed(): 
     bools = [e.is_set() for e in events] 
     if any(bools): 
      or_event.set() 
     else: 
      or_event.clear() 
    for e in events: 
     orify(e, changed) 
    changed() 
    return or_event 

Ejemplo de uso:

def wait_on(name, e): 
    print "Waiting on %s..." % (name,) 
    e.wait() 
    print "%s fired!" % (name,) 

def test(): 
    import time 

    e1 = threading.Event() 
    e2 = threading.Event() 

    or_e = OrEvent(e1, e2) 

    threading.Thread(target=wait_on, args=('e1', e1)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('e2', e2)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('or_e', or_e)).start() 
    time.sleep(0.05) 

    print "Firing e1 in 2 seconds..." 
    time.sleep(2) 
    e1.set() 
    time.sleep(0.05) 

    print "Firing e2 in 2 seconds..." 
    time.sleep(2) 
    e2.set() 
    time.sleep(0.05) 

El resultado de las cuales era:

Waiting on e1... 
Waiting on e2... 
Waiting on or_e... 
Firing e1 in 2 seconds... 
e1 fired!or_e fired! 

Firing e2 in 2 seconds... 
e2 fired! 

Th debe ser seguro para subprocesos Cualquier comentario es bienvenido

EDITAR: Ah, y aquí está su función wait_for_either, aunque la forma en que escribí el código, es mejor hacer y pasar un or_event. Tenga en cuenta que el or_event no se debe configurar o borrar de forma manual.

def wait_for_either(e1, e2): 
    OrEvent(e1, e2).wait() 
+2

¡Esto es bonito! Sin embargo, veo un problema: si 'orify' el mismo evento dos veces, obtendrá un bucle infinito cada vez que lo configure o lo borre. – Vincent

+0

¡Ese es un buen punto! Modificará pronto – Claudiu

+0

¡Muchas gracias por esto! Es exactamente lo que estaba buscando. ¿Aceptaría dejar que el código en esta respuesta se use bajo términos de licencia de fuente abierta? BSD o MIT serían ideales, ya que son compatibles con Numpy, Pandas, Scipy, etc. – naitsirhc

4

Una solución (con el sondeo) sería hacer esperas secuenciales en cada Event en un bucle

def wait_for_either(a, b): 
    while True: 
     if a.wait(tunable_timeout): 
      break 
     if b.wait(tunable_timeout): 
      break 

creo que si sintoniza el tiempo de espera lo suficientemente bien los resultados serían OK.


La mejor de votación no se me ocurre es que esperar a que cada uno en un hilo diferente y establecer un compartían Event los cuales se debe esperar después en el hilo principal.

def repeat_trigger(waiter, trigger): 
    waiter.wait() 
    trigger.set() 

def wait_for_either(a, b): 
    trigger = threading.Event() 
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger)) 
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger)) 
    ta.start() 
    tb.start() 
    # Now do the union waiting 
    trigger.wait() 

bastante interesante, así que escribí una versión orientada a objetos de la solución anterior:

class EventUnion(object): 
    """Register Event objects and wait for release when any of them is set""" 
    def __init__(self, ev_list=None): 
     self._trigger = Event() 
     if ev_list: 
      # Make a list of threads, one for each Event 
      self._t_list = [ 
       Thread(target=self._triggerer, args=(ev,)) 
       for ev in ev_list 
      ] 
     else: 
      self._t_list = [] 

    def register(self, ev): 
     """Register a new Event""" 
     self._t_list.append(Thread(target=self._triggerer, args=(ev,))) 

    def wait(self, timeout=None): 
     """Start waiting until any one of the registred Event is set""" 
     # Start all the threads 
     map(lambda t: t.start(), self._t_list) 
     # Now do the union waiting 
     return self._trigger.wait(timeout) 

    def _triggerer(self, ev): 
     ev.wait() 
     self._trigger.set() 
+1

se puede hacer que repeat_trigger también verifique el trigger (con timeout = 0 para trigger y timeout> 0 para waiter) para que eventualmente todos los threads finalicen –

+0

estaba pensando lo mismo pero tiene que haber una manera mejor que comenzar 2 threads ... – Claudiu

0

No es bonita, pero se pueden utilizar dos hilos adicionales para multiplexar los eventos ...

def wait_for_either(a, b): 
    flag = False #some condition variable, event, or similar 

    class Event_Waiter(threading.Thread): 
    def __init__(self, event): 
     self.e = event 
    def run(self): 
     self.e.wait() 
     flag.set() 

    a_thread = Event_Waiter(a) 
    b_thread = Event_Waiter(b) 
    a.start() 
    b.start() 
    flag.wait() 

Nota: es posible que tenga que preocuparse por recibir accidentalmente ambos eventos si llegan demasiado rápido. Los hilos auxiliares (a_thread y b_thread) deberían sincronizarse alrededor de intentar establecer el marcador y luego deberían matar al otro hilo (posiblemente reiniciando el evento de ese hilo si se consumió).

1

Iniciar hilos adicionales parece ser una solución clara, aunque no muy eficiente. Función wait_events bloqueará la utilización de cualquiera de los eventos configurados.

def wait_events(*events): 
    event_share = Event() 

    def set_event_share(event): 
     event.wait() 
     event.clear() 
     event_share.set() 
    for event in events: 
     Thread(target=set_event_share(event)).start() 

    event_share.wait() 

wait_events(event1, event2, event3) 
+0

Sería bueno saber cuál se ha activado – Har

0
def wait_for_event_timeout(*events): 
    while not all([e.isSet() for e in events]): 
     #Check to see if the event is set. Timeout 1 sec. 
     ev_wait_bool=[e.wait(1) for e in events] 
     # Process if all events are set. Change all to any to process if any event set 
     if all(ev_wait_bool): 
      logging.debug('processing event') 
     else: 
      logging.debug('doing other work') 


e1 = threading.Event() 
e2 = threading.Event() 

t3 = threading.Thread(name='non-block-multi', 
         target=wait_for_event_timeout, 
         args=(e1,e2)) 
t3.start() 

logging.debug('Waiting before calling Event.set()') 
time.sleep(5) 
e1.set() 
time.sleep(10) 
e2.set() 
logging.debug('Event is set') 
1

Extendiendo Claudiu's respuesta en el que puede esperar para el evento 1 ó evento 2. 1 o evento e incluso 2.

from threading import Thread, Event, _Event 

class ConditionalEvent(_Event): 
    def __init__(self, events_list, condition): 
     _Event.__init__(self) 

     self.event_list = events_list 
     self.condition = condition 

     for e in events_list: 
      self._setup(e, self._state_changed) 

     self._state_changed() 

    def _state_changed(self): 
     bools = [e.is_set() for e in self.event_list] 
     if self.condition == 'or': 

      if any(bools): 
       self.set() 
      else: 
       self.clear() 

     elif self.condition == 'and': 

      if all(bools): 
       self.set() 
      else: 
       self.clear() 

    def _custom_set(self,e): 
     e._set() 
     e._state_changed() 

    def _custom_clear(self,e): 
     e._clear() 
     e._state_changed() 

    def _setup(self, e, changed_callback): 
     e._set = e.set 
     e._clear = e.clear 
     e._state_changed = changed_callback 
     e.set = lambda: self._custom_set(e) 
     e.clear = lambda: self._custom_clear(e) 

Ejemplo de uso será muy similar a la anterior

import time 

e1 = Event() 
e2 = Event() 

or_e = ConditionalEvent([e1, e2], 'or') 


Thread(target=wait_on, args=('e1', e1)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('e2', e2)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('or_e', or_e)).start() 
time.sleep(0.05) 

print "Firing e1 in 2 seconds..." 
time.sleep(2) 
e1.set() 
time.sleep(0.05) 

print "Firing e2 in 2 seconds..." 
time.sleep(2) 
e2.set() 
time.sleep(0.05) 
Cuestiones relacionadas