2012-01-01 14 views
5

Estoy tratando de escribir una implementación aparentemente simple de la clásica expresión de productor - consumidor en Python. Hay un productor comparablemente rápido para consumidores múltiples más lentos. En principio, este es fácil de usar usando el módulo Queue, y la documentación de la biblioteca tiene un ejemplo que genera solo unas pocas líneas de código.Python productor/consumidor con manejo de excepción

Sin embargo, también quiero que el código funcione correctamente en caso de que se produzcan excepciones. Tanto el productor y todos los consumidores deben parar en caso de que alguna de las siguientes cosas:

  • el productor falla con una excepción
  • cualquier consumidor falla con una excepción
  • el usuario deja el programa (que causa una KeyboardInterrupt)

Después de eso, todo el proceso debe fallar lanzar la excepción inicial para informar a la persona que llama lo que salió mal.

El principal desafío parece ser terminar limpiamente el hilo del consumidor sin terminar en en una unión de bloqueo(). Parece ser popular configurar Thread.deamon = True, pero para mi esto causa fugas de recursos en caso de que el productor falle con una excepción.

Me las arreglé para escribir una implementación que cumpla con mis requisitos (ver a continuación). Sin embargo, el código es mucho más complejo de lo esperado.

¿Hay alguna manera más sencilla de manejar este escenario?

Aquí hay un par de ejemplos de las llamadas y los mensajes de registro final resultante de mi actual aplicación:

producen y consumen 10 artículos:

$ python procon.py 
INFO:root:processed all items 

no producen artículos:

$ python procon.py --items 0 
INFO:root:processed all items 

Produce 5 artículos para 10 consumidores, utilizando solo algunos de los consumidores disponibles:

$ python procon.py --items 5 --consumers 10 
INFO:root:processed all items 

de interrupción pulsando Control-C:

$ python procon.py 
^CWARNING:root:interrupted by user 

dejar de producir el punto 3:

$ python procon.py --producer-fails-at 3 
ERROR:root:cannot produce item 3 

Falla para consumir artículo 3:

$ python procon.py --consumer-fails-at 3 
ERROR:root:cannot consume item 3 

Falla para consumir la último artículo:

$ python procon.py --items 10 --consumer-fails-at 9 
ERROR:root:cannot consume item 9 

Y aquí está el código fuente probablemente demasiado complejo:

""" 
Consumer/producer to test exception handling in threads. Both the producer 
and the consumer can be made to fail deliberately when processing a certain 
item using command line options. 
""" 
import logging 
import optparse 
import Queue 
import threading 
import time 

_PRODUCTION_DELAY = 0.1 
_CONSUMPTION_DELAY = 0.3 

# Delay for ugly hacks and polling loops. 
_HACK_DELAY = 0.05 

class _Consumer(threading.Thread): 
    """ 
    Thread to consume items from an item queue filled by a producer, which can 
    be told to terminate in two ways: 

    1. using `finish()`, which keeps processing the remaining items on the 
     queue until it is empty 
    2. using `cancel()`, which finishes consuming the current item and then 
     terminates 
    """ 
    def __init__(self, name, itemQueue, failedConsumers): 
     super(_Consumer, self).__init__(name=name) 
     self._log = logging.getLogger(name) 
     self._itemQueue = itemQueue 
     self._failedConsumers = failedConsumers 
     self.error = None 
     self.itemToFailAt = None 
     self._log.info(u"waiting for items to consume") 
     self._isFinishing = False 
     self._isCanceled = False 

    def finish(self): 
     self._isFinishing = True 

    def cancel(self): 
     self._isCanceled = True 

    def consume(self, item): 
     self._log.info(u"consume item %d", item) 
     if item == self.itemToFailAt: 
      raise ValueError("cannot consume item %d" % item) 
     time.sleep(_CONSUMPTION_DELAY) 

    def run(self): 
     try: 
      while not (self._isFinishing and self._itemQueue.empty()) \ 
        and not self._isCanceled: 
       # HACK: Use a timeout when getting the item from the queue 
       # because between `empty()` and `get()` another consumer might 
       # have removed it. 
       try: 
        item = self._itemQueue.get(timeout=_HACK_DELAY) 
        self.consume(item) 
       except Queue.Empty: 
        pass 
      if self._isCanceled: 
       self._log.info(u"canceled") 
      if self._isFinishing: 
       self._log.info(u"finished") 
     except Exception, error: 
      self._log.error(u"cannot continue to consume: %s", error) 
      self.error = error 
      self._failedConsumers.put(self) 


class Worker(object): 
    """ 
    Controller for interaction between producer and consumers. 
    """ 
    def __init__(self, itemsToProduceCount, itemProducerFailsAt, 
      itemConsumerFailsAt, consumerCount): 
     self._itemsToProduceCount = itemsToProduceCount 
     self._itemProducerFailsAt = itemProducerFailsAt 
     self._itemConsumerFailsAt = itemConsumerFailsAt 
     self._consumerCount = consumerCount 
     self._itemQueue = Queue.Queue() 
     self._failedConsumers = Queue.Queue() 
     self._log = logging.getLogger("producer") 
     self._consumers = [] 

    def _possiblyRaiseConsumerError(self): 
      if not self._failedConsumers.empty(): 
       failedConsumer = self._failedConsumers.get() 
       self._log.info(u"handling failed %s", failedConsumer.name) 
       raise failedConsumer.error 

    def _cancelAllConsumers(self): 
     self._log.info(u"canceling all consumers") 
     for consumerToCancel in self._consumers: 
      consumerToCancel.cancel() 
     self._log.info(u"waiting for consumers to be canceled") 
     for possiblyCanceledConsumer in self._consumers: 
      # In this case, we ignore possible consumer errors because there 
      # already is an error to report. 
      possiblyCanceledConsumer.join(_HACK_DELAY) 
      if possiblyCanceledConsumer.isAlive(): 
       self._consumers.append(possiblyCanceledConsumer) 

    def work(self): 
     """ 
     Launch consumer thread and produce items. In case any consumer or the 
     producer raise an exception, fail by raising this exception 
     """ 
     self.consumers = [] 
     for consumerId in range(self._consumerCount): 
      consumerToStart = _Consumer(u"consumer %d" % consumerId, 
       self._itemQueue, self._failedConsumers) 
      self._consumers.append(consumerToStart) 
      consumerToStart.start() 
      if self._itemConsumerFailsAt is not None: 
       consumerToStart.itemToFailAt = self._itemConsumerFailsAt 

     self._log = logging.getLogger("producer ") 
     self._log.info(u"producing %d items", self._itemsToProduceCount) 

     for itemNumber in range(self._itemsToProduceCount): 
      self._possiblyRaiseConsumerError() 
      self._log.info(u"produce item %d", itemNumber) 
      if itemNumber == self._itemProducerFailsAt: 
       raise ValueError("ucannot produce item %d" % itemNumber) 
      # Do the actual work. 
      time.sleep(_PRODUCTION_DELAY) 
      self._itemQueue.put(itemNumber) 

     self._log.info(u"telling consumers to finish the remaining items") 
     for consumerToFinish in self._consumers: 
      consumerToFinish.finish() 
     self._log.info(u"waiting for consumers to finish") 
     for possiblyFinishedConsumer in self._consumers: 
      self._possiblyRaiseConsumerError() 
      possiblyFinishedConsumer.join(_HACK_DELAY) 
      if possiblyFinishedConsumer.isAlive(): 
       self._consumers.append(possiblyFinishedConsumer) 


if __name__ == "__main__": 
    logging.basicConfig(level=logging.INFO) 
    parser = optparse.OptionParser() 
    parser.add_option("-c", "--consumer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which consumer fails (default: %default)") 
    parser.add_option("-i", "--items", metavar="NUMBER", type="long", 
     help="number of items to produce (default: %default)", default=10) 
    parser.add_option("-n", "--consumers", metavar="NUMBER", type="long", 
     help="number of consumers (default: %default)", default=2) 
    parser.add_option("-p", "--producer-fails-at", metavar="NUMBER", 
     type="long", help="number of items at which producer fails (default: %default)") 
    options, others = parser.parse_args() 
    worker = Worker(options.items, options.producer_fails_at, 
     options.consumer_fails_at, options.consumers) 
    try: 
     worker.work() 
     logging.info(u"processed all items") 
    except KeyboardInterrupt: 
     logging.warning(u"interrupted by user") 
     worker._cancelAllConsumers() 
    except Exception, error: 
     logging.error(u"%s", error) 
     worker._cancelAllConsumers() 
+0

Tal vez no lo que está buscando, pero hay una gran biblioteca de Python llamada de apio que se puede utilizar en lugar de escribir su implementación propia de puesta en cola. –

+0

Gracias por el puntero. El apio parece interesante para tareas complejas usando servicios web y bases de datos. Para mi tarea particular, el productor lee las líneas de un archivo y hace un análisis estructural básico y pasa los datos a los consumidores, por lo que en su mayoría se trata de un trabajo intensivo de E/S. Los consumidores procesan los datos haciendo un trabajo intensivo de CPU. Como todo esto tiene lugar en la memoria en la misma máquina, la cola estándar de Python parece estar bien para. – roskakori

Respuesta

0

Como las respuestas hasta ahora me dieron buenas pistas pero me faltaba código de trabajo, tomé el código de mi pregunta y lo envolví en una biblioteca, que está disponible en http://pypi.python.org/pypi/proconex/. Puede encontrar el código fuente en https://github.com/roskakori/proconex. Si bien la interfaz se siente sensata, la implementación aún usa encuestas, por lo que las contribuciones son bienvenidas.

Cualquier excepción en un hilo productor o consumidor se vuelve a plantear en el hilo principal. Solo asegúrese de usar la declaración with o finally:worker.close() para asegurarse de que todos los hilos se cierren correctamente.

Aquí hay un pequeño ejemplo para un productor con dos consumidores para los números enteros:

import logging 
import proconex 

class IntegerProducer(proconex.Producer): 
    def items(self): 
     for item in xrange(10): 
      logging.info('produce %d', item) 
      yield item 

class IntegerConsumer(proconex.Consumer): 
    def consume(self, item): 
     logging.info('consume %d with %s', item, self.name) 

if __name__ == '__main__': 
    logging.basicConfig(level=logging.INFO) 
    producer = IntegerProducer() 
    consumer1 = IntegerConsumer('consumer1') 
    consumer2 = IntegerConsumer('consumer2') 

    with proconex.Worker(producer, [consumer1, consumer2]) as worker: 
     worker.work() 
2

Se necesita una cola con un método cancel que vacía la cola interna, establece un indicador cancelado, y luego se despierta a todo el mundo. El trabajador se despertará de join(), revisará el indicador cancelado en la cola y actuará de manera apropiada. Los consumidores se despertarán de get() y verificarán el indicador cancelado en la cola e imprimirán un error. Entonces su consumidor solo necesitaría llamar al método cancel() en caso de una excepción.

Desafortunadamente, Python Queue no tiene un método de cancelación. Unas pocas opciones saltan a la mente:

  • rodar su propia cola (puede ser complicado para hacerlo bien)
  • Extender la cola de serpiente pitón y añadir la cancelación de método (parejas el código para la implementación interna de la cola de Python clase)
  • Proxy la clase de cola y sobrecarga join/get con su lógica de espera ocupada (sigue siendo un hack de espera ocupado, pero lo confina en un solo lugar y limpia el código productor/consumidor)
  • Buscar otra implementación de cola/biblioteca por ahí
+0

Sí, al mover la lógica de cancelación a la cola, ciertamente se limpiará el código del trabajador. Teniendo en cuenta mis requisitos, la cola también debería poder recordar la posible información de excepción porque quiero que los consumidores informen el error al trabajador, no solo que lo impriman. Pero eso ciertamente se puede hacer. ¿Alguien sabe de una implementación existente de tal cola? – roskakori

Cuestiones relacionadas