2012-02-09 19 views
5

Tengo alguna función de productor que se basa en llamadas de bloqueo de E/S pesadas y algunas funciones de consumidor que también dependen de llamadas de bloqueo de E/S pesadas. Para acelerarlos, utilicé la biblioteca de microhilo Gevent como pegamento.¿Cómo puedo implementar un paradigma de múltiples productores y consumidores múltiples en Gevent?

Este es el aspecto de mi paradigma como:

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 

def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 

def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 



for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

#This doesnt work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

#Uncommenting this makes this script work. 
#producer() 

q.join() 

tengo cuatro consumidores y me gustaría tener dos productores. Los productores salen cuando hacen una señal, es decir, 10. Los consumidores continúan alimentándose de esta cola y la tarea completa termina cuando los productores y los consumidores terminan.

Sin embargo, esto no funciona. Si hago un comentario en el bucle for que genera múltiples productores y usa solo un único productor, el script funciona bien.

Parece que no puedo entender lo que hice mal.

¿Alguna idea?

Gracias

Respuesta

6

En realidad, no desea salir cuando la cola no tiene un trabajo sin terminar, porque conceptualmente no es así cuando la aplicación debe finalizar.

Quiere salir cuando los productores han terminado, y luego cuando no hay un trabajo sin terminar.

# Wait for all producers to finish producing 
gevent.joinall(producers) 
# *Now* we want to make sure there's no unfinished work 
q.join() 
# We don't care about workers. We weren't paying them anything, anyways 
gevent.killall(workers) 
# And, we're done. 
3

creo que lo hace q.join() antes de que algo se pone en la cola y se cierra inmediatamente. Intenta unirte a todos los productores antes de unirte a la cola.

+0

Hola ZCH, no he seguido en su totalidad su respuesta. ¿Podrías pegar un pequeño fragmento por favor? Eso aclararía las cosas un poco. –

+0

@MridangAgarwalla - Antes 'q.join()' write 'para productor en productores: productor.join()'. De esta forma, primero espere hasta que todos los productores finalicen su trabajo y luego hasta que la cola esté vacía. – zch

+0

Aha, tal vez lo he implementado mal. Quería que mis productores y consumidores se ejecutaran concurrentemente, es decir, que los productores continúen agregando a la cola hasta que terminen mientras los consumidores se alimentan de ella hasta que todos los artículos de cola estén terminados y los productores ya no agreguen cosas a la cola. –

0

Lo que quiere hacer es bloquear el programa principal mientras los productores y los trabajadores se comunican. El bloqueo en la cola esperará hasta que la cola esté vacía y luego ceda, lo que podría ser inmediatamente. Poner esto en el final de su programa en lugar de q.join()

gevent.joinall(producers) 
0

He conocido mismos problemas como la suya. El problema principal con su código era que su productor se generó en el hilo gevent que hace que el trabajador no pueda obtener la tarea inmediatamente.

Sugiero que debe ejecutar producer() en el proceso principal no generar en gevent thread Cuando el proceso se ejecutó conoció al productor que podría impulsar la tarea inmediatamente.

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 

def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 

def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


producer() 

for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

códigos anteriores tiene sentido .. :)

Cuestiones relacionadas