2012-03-07 19 views
6

Soy bastante nuevo en python. Estoy usando el módulo de multiprocesamiento para leer líneas de texto en stdin, convirtiéndolas de alguna manera y escribiéndolas en una base de datos. He aquí un fragmento de mi código:python pool apply_async y map_async no bloquean en cola completa

batch = [] 
pool = multiprocessing.Pool(20) 
i = 0 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     pool.apply_async(insert, args=(batch,i+1)) 
     batch = [] 
pool.apply_async(insert, args=(batch,i)) 
pool.close() 
pool.join() 

Ahora que todo funciona bien, hasta que llego a procesar enormes archivos de entrada (cientos de millones de líneas) que la tubería i en mi programa pitón. En algún momento, cuando mi base de datos se vuelve más lenta, veo que la memoria se está llenando.

Después de jugar un poco, resultó que pool.apply_async y pool.map_async nunca bloquean, por lo que la cola de las llamadas a procesar crece cada vez más.

¿Cuál es el enfoque correcto para mi problema? Esperaría un parámetro que pueda establecer, que bloqueará la llamada pool.apply_async, tan pronto como se haya alcanzado una determinada longitud de cola. AFAIR en Java uno puede darle al ThreadPoolExecutor una BlockingQueue con una longitud fija para ese propósito.

Gracias!

+1

_ "Resultó que pool.apply_async, así como pool.map_async nunca bloquear" _ - todo lo que estaba buscando – leon

Respuesta

2

apply_async devuelve un objeto AsyncResult, que se puede wait en:

if len(batch) >= 10000: 
    r = pool.apply_async(insert, args=(batch, i+1)) 
    r.wait() 
    batch = [] 

, aunque si quieres hacer esto de una manera más limpia, se debe utilizar un multiprocessing.Queue con un maxsize de 10000, y derivar una Worker clase de multiprocessing.Process que obtiene de dicha cola.

+1

bien esperando en el AsyncResult no ayuda ya que mi problema es que la cola en el La piscina crece a lo grande. Me pregunto si puedo controlar el tamaño de la cola interna en el grupo. – konstantin

+0

@konstantin: No estoy seguro de entender. Mientras esperas el 'AsyncResult', el proceso maestro no puede completar el siguiente lote, ¿verdad? –

9

Por si acaso alguien termina aquí, así es como resolví el problema: dejé de usar multiprocesamiento.Pool. Aquí es cómo lo hago ahora:

#set amount of concurrent processes that insert db data 
processes = multiprocessing.cpu_count() * 2 

#setup batch queue 
queue = multiprocessing.Queue(processes * 2) 

#start processes 
for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() 

#fill queue with batches  
batch=[] 
for i, content in enumerate(sys.stdin): 
    batch.append(content) 
    if len(batch) >= 10000: 
     queue.put((batch,i+1)) 
     batch = [] 
if batch: 
    queue.put((batch,i+1)) 

#stop processes using poison-pill 
for _ in range(processes): queue.put((None,None)) 

print "all done." 

en el método de inserción el procesamiento de cada lote se envuelve en un bucle que se saca de la cola hasta que recibe la píldora de veneno:

while True: 
    batch, end = queue.get() 
    if not batch and not end: return #poison pill! complete! 
    [process the batch] 
print 'worker done.' 
+0

Buen ejemplo simple. El grupo de multiprocesamiento con frecuencia es más problemático de lo que vale, especialmente porque crear su propio grupo de procesos es bastante simple. – travc

8

El apply_async y las funciones map_async están diseñadas para no bloquear el proceso principal. Para hacerlo, el Pool mantiene un Queue interno cuyo tamaño es lamentablemente imposible de cambiar.

La forma en que se puede resolver el problema es mediante el uso de un Semaphore inicializado con el tamaño que desea que sea la cola. Adquiere y suelta el semáforo antes de alimentar el grupo y después de que un trabajador haya completado la tarea.

Aquí hay un ejemplo que trabaja con Python 2.6 o superior.

from threading import Semaphore 
from multiprocessing import Pool 

def task_wrapper(f): 
    """Python2 does not allow a callback for method raising exceptions, 
    this wrapper ensures the code run into the worker will be exception free. 

    """ 
    try: 
     return f() 
    except: 
     return None 

def TaskManager(object): 
    def __init__(self, processes, queue_size): 
     self.pool = Pool(processes=processes) 
     self.workers = Semaphore(processes + queue_size) 

    def new_task(self, f): 
     """Start a new task, blocks if queue is full.""" 
     self.workers.acquire() 
     self.pool.apply_async(task_wrapper, args=(f,), callback=self.task_done)) 

    def task_done(self): 
     """Called once task is done, releases the queue is blocked.""" 
     self.workers.release()