2011-07-12 4 views
10

Estoy tratando de usar una cola con la biblioteca de multiprocesamiento en Python. Después de ejecutar el código a continuación (las instrucciones de impresión funcionan), pero los procesos no se cierran después de que llamo a join en la cola y todavía están vivos. ¿Cómo puedo finalizar los procesos restantes?Cola de multiprocesamiento en Python

Gracias!

def MultiprocessTest(self): 
    print "Starting multiprocess." 
    print "Number of CPUs",multiprocessing.cpu_count() 

    num_procs = 4 
    def do_work(message): 
    print "work",message ,"completed" 

    def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

    q = multiprocessing.JoinableQueue() 
    for i in range(num_procs): 
    p = multiprocessing.Process(target=worker) 
    p.daemon = True 
    p.start() 

    source = ['hi','there','how','are','you','doing'] 
    for item in source: 
    q.put(item) 
    print "q close" 
    q.join() 
    #q.close() 
    print "Finished everything...." 
    print "num active children:",multiprocessing.active_children() 

Respuesta

7

intente esto:

import multiprocessing 

num_procs = 4 
def do_work(message): 
    print "work",message ,"completed" 

def worker(): 
    for item in iter(q.get, None): 
    do_work(item) 
    q.task_done() 
    q.task_done() 

q = multiprocessing.JoinableQueue() 
procs = [] 
for i in range(num_procs): 
    procs.append(multiprocessing.Process(target=worker)) 
    procs[-1].daemon = True 
    procs[-1].start() 

source = ['hi','there','how','are','you','doing'] 
for item in source: 
    q.put(item) 

q.join() 

for p in procs: 
    q.put(None) 

q.join() 

for p in procs: 
    p.join() 

print "Finished everything...." 
print "num active children:", multiprocessing.active_children() 
+0

¿Hay alguna razón por la que no coloque a None en la cola después de finalizar? Pensé que task_done() podría ayudar a evitar ese problema. Estaba tratando de modelar mi código después del ejemplo en la parte inferior de esta página: http://docs.python.org/library/queue.html – aerain

+0

Esto realmente no funciona :( – aerain

+0

No calificando la solución, pero haciendo alusión a cómo hacer que funcione: mover la línea de declaración "q =" antes de su primer uso en def worker() ... ;-) – Dilettant

3

se deben eliminar la cola antes de unirse al proceso, pero q.empty() no es confiable.

La mejor manera de borrar la cola es contar el número de get o loop exitosos hasta que reciba un valor centinela, como un socket con una red confiable.

6

Sus trabajadores necesitan un centinela para terminar, o simplemente se sentarán en las lecturas de bloqueo. Tenga en cuenta que el uso de dormir en el Q en lugar de unirse a la P le permite visualizar la información de estado, etc.
Mi plantilla seleccionada es:

def worker(q,nameStr): 
    print 'Worker %s started' %nameStr 
    while True: 
    item = q.get() 
    if item is None: # detect sentinel 
     break 
    print '%s processed %s' % (nameStr,item) # do something useful 
    q.task_done() 
    print 'Worker %s Finished' % nameStr 
    q.task_done() 

q = multiprocessing.JoinableQueue() 
procs = [] 
for i in range(num_procs): 
    nameStr = 'Worker_'+str(i) 
    p = multiprocessing.Process(target=worker, args=(q,nameStr)) 
    p.daemon = True 
    p.start() 
    procs.append(p) 

source = ['hi','there','how','are','you','doing'] 
for item in source: 
    q.put(item) 

for i in range(num_procs): 
    q.put(None) # send termination sentinel, one for each process 

while not q.empty(): # wait for processing to finish 
    sleep(1) # manage timeouts and status updates etc. 
+1

aunque no es q.empty(), no es una forma confiable de saber que el procesamiento ha finalizado, solo cuando un trabajador toma la última pieza de trabajo por hacer. Francamente, con la forma en que usas incorrectamente JoinableQueue, no necesitas JoinableQueue. Si elige no usar uno, no necesitaría los hilos de trabajo para marcar task_done. El propósito de usar dicha cola es para que pueda unirse a ella, que es exactamente lo que quiere hacer al final de este programa en lugar de esperar a que la cola esté vacía. – leetNightshade

+0

Sí, con este método, el trabajo termina prematuramente. – Forethinker

1

El código siguiente puede no ser muy relevante pero post-it para sus comentarios/retroalimentaciones para que podamos aprender juntos. ¡Gracias!

import multiprocessing 

def boss(q,nameStr): 
    source = range(1024) 
    for item in source: 
    q.put(nameStr+' '+str(item)) 
    q.put(None) # send termination sentinel, one for each process 

def worker(q,nameStr): 
    while True: 
    item = q.get() 
    if item is None: # detect sentinel 
     break 
    print '%s processed %s' % (nameStr,item) # do something useful 

q = multiprocessing.Queue() 

procs = [] 

num_procs = 4 
for i in range(num_procs): 
    nameStr = 'ID_'+str(i) 
    p = multiprocessing.Process(target=worker, args=(q,nameStr)) 
    procs.append(p) 
    p = multiprocessing.Process(target=boss, args=(q,nameStr)) 
    procs.append(p) 

for j in procs: 
    j.start() 
for j in procs: 
    j.join() 
0

Aquí es una centinela de libre método para la relativamente simple caso en el que se pone una serie de tareas en un JoinableQueue, a continuación, iniciar los procesos de trabajo que consumen las tareas y la salida una vez que leen la cola "en seco" . El truco es usar JoinableQueue.get_nowait() en lugar de get(). get_nowait(), como su nombre lo indica, intenta obtener un valor de la cola de forma no bloqueante y si no se obtiene nada, se genera una excepción queue.Empty. El trabajador maneja esta excepción al salir.

código rudimentario para ilustrar el principio:

import multiprocessing as mp 
from queue import Empty 

def worker(q): 
    while True: 
    try: 
     work = q.get_nowait() 
     # ... do something with `work` 
     q.task_done() 
    except Empty: 
     break # completely done 

# main 
worknum = 4 
jq = mp.JoinableQueue() 

# fill up the task queue 
# let's assume `tasks` contains some sort of data 
# that your workers know how to process 
for task in tasks: 
    jq.put(task) 

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ] 
for p in procs: 
    p.start() 

for p in procs: 
    p.join() 

La ventaja es que no es necesario para poner las "píldoras venenosas" en la cola de lo que el código es un poco más corto.

IMPORTANTE: en situaciones más complejas en las que los productores y los consumidores utilizan la misma cola de una manera "intercalado" y los trabajadores pueden tener que esperar a nuevas tareas a venir, el enfoque de "píldora venenosa" debe ser usado. Mi sugerencia anterior es para casos simples en los que los trabajadores "saben" que si la cola de tareas está vacía, entonces ya no tiene sentido dar vueltas.

Cuestiones relacionadas