Estoy utilizando procesos y cola de multiprocesamiento. Empiezo varias funciones en paralelo y la mayoría se comportan bien: terminan, su salida va a su cola y se muestran como .is_alive() == False. Pero por alguna razón, un par de funciones no se comportan. Siempre muestran .is_alive() == Verdadero, incluso después de completar la última línea de la función (una instrucción de impresión que dice "Finalizado"). Esto sucede independientemente del conjunto de funciones que ejecute, incluso si solo hay una. Si no se ejecuta en paralelo, las funciones se comportan bien y regresan normalmente. ¿Qué tipo de la cosa podría ser el problema?multiproceso de python: algunas funciones no se devuelven cuando están completas (material de cola demasiado grande)
Aquí está la función genérica que estoy usando para administrar los trabajos. Todo lo que no estoy mostrando son las funciones que le estoy pasando. Son largos, a menudo usan matplotlib, algunas veces ejecutan comandos de shell, pero no puedo entender qué tienen en común los errores.
def runFunctionsInParallel(listOf_FuncAndArgLists):
"""
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
"""
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
return
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
n=1
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
n+=1
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
print('---------------------------------------------------\n')
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
Toma completa en la oscuridad: ¿Los que cuelgan devuelven un valor? (literalmente, ¿tienen 'return' en ellos?) – Logan
Todas las funciones, buenas y malas, devuelven una sola cadena (larga). – CPBL
Sin embargo, si elimino el uso de Colas, el problema desaparece. Entonces ... una cola se ha llenado. Puedo verlo, y se ve bien, pero de alguna manera el trabajo no termina cuando hay una cola asociada (y solo para funciones "malas"). – CPBL