2012-08-07 23 views
22

Estoy utilizando procesos y cola de multiprocesamiento. Empiezo varias funciones en paralelo y la mayoría se comportan bien: terminan, su salida va a su cola y se muestran como .is_alive() == False. Pero por alguna razón, un par de funciones no se comportan. Siempre muestran .is_alive() == Verdadero, incluso después de completar la última línea de la función (una instrucción de impresión que dice "Finalizado"). Esto sucede independientemente del conjunto de funciones que ejecute, incluso si solo hay una. Si no se ejecuta en paralelo, las funciones se comportan bien y regresan normalmente. ¿Qué tipo de la cosa podría ser el problema?multiproceso de python: algunas funciones no se devuelven cuando están completas (material de cola demasiado grande)

Aquí está la función genérica que estoy usando para administrar los trabajos. Todo lo que no estoy mostrando son las funciones que le estoy pasando. Son largos, a menudo usan matplotlib, algunas veces ejecutan comandos de shell, pero no puedo entender qué tienen en común los errores.

def runFunctionsInParallel(listOf_FuncAndArgLists): 
    """ 
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order. 
    """ 
    from multiprocessing import Process, Queue 

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue 
     print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name 
     que.put(fff(*theArgs)) #we're putting return value into queue 
     print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name 
     # We get this far even for "bad" functions 
     return 

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function 
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)] 
    for job in jobs: job.start() # Launch them all 
    import time 
    from math import sqrt 
    n=1 
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates 
     n+=1 
     time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs. 
     print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------') 
     print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)])) 
     print('---------------------------------------------------\n') 
    # I never get to the following line when one of the "bad" functions is running. 
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues? 
    # And now, collect all the outputs: 
    return([queue.get() for queue in queues]) 
+1

Toma completa en la oscuridad: ¿Los que cuelgan devuelven un valor? (literalmente, ¿tienen 'return' en ellos?) – Logan

+0

Todas las funciones, buenas y malas, devuelven una sola cadena (larga). – CPBL

+0

Sin embargo, si elimino el uso de Colas, el problema desaparece. Entonces ... una cola se ha llenado. Puedo verlo, y se ve bien, pero de alguna manera el trabajo no termina cuando hay una cola asociada (y solo para funciones "malas"). – CPBL

Respuesta

14

bien, parece que el tubo usado para llenar la cola se tapa cuando la salida de una función es demasiado grande (mi entendimiento crudo? Esto es un fallo sin resolver/cerrado? http://bugs.python.org/issue8237). He modificado el código en mi pregunta para que haya algo de almacenamiento en búfer (las colas se vacían regularmente mientras se están ejecutando los procesos), lo que resuelve todos mis problemas. Entonces ahora esto toma una colección de tareas (funciones y sus argumentos), los lanza y recopila los resultados. Desearía que fuera más simple/limpia.

Editar (2014 Sep; actualización 2017 Nov: reescrito para facilitar la lectura): estoy actualizando el código con las mejoras que he realizado desde entonces. El nuevo código (la misma función, pero mejores características) está aquí: https://github.com/cpbl/cpblUtilities/blob/master/parallel.py

La descripción de la llamada también se encuentra debajo.

def runFunctionsInParallel(*args, **kwargs): 
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments. 
    """ 
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs() 

########################################################################################### 
### 
class cRunFunctionsInParallel(): 
    ### 
    ####################################################################################### 
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel. 
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied. 
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name. 
Parameters 
---------- 
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs], 
    specifying the set of functions to be launched in parallel. If an 
    element is just a function, rather than a list, then it is assumed 
    to have no arguments or keyword arguments. Thus, possible formats 
    for elements of the outer list are: 
     function 
     [function, list] 
     [function, list, dict] 
kwargs: dict 
    One can also supply the kwargs once, for all jobs (or for those 
    without their own non-empty kwargs specified in the list) 
names: an optional list of names to identify the processes. 
    If omitted, the function name is used, so if all the functions are 
    the same (ie merely with different arguments), then they would be 
    named indistinguishably 
offsetsSeconds: int or list of ints 
    delay some functions' start times 
expectNonzeroExit: True/False 
    Normal behaviour is to not proceed if any function exits with a 
    failed exit code. This can be used to override this behaviour. 
parallel: True/False 
    Whenever the list of functions is longer than one, functions will 
    be run in parallel unless this parameter is passed as False 
maxAtOnce: int 
    If nonzero, this limits how many jobs will be allowed to run at 
    once. By default, this is set according to how many processors 
    the hardware has available. 
showFinished : int 
    Specifies the maximum number of successfully finished jobs to show 
    in the text interface (before the last report, which should always 
    show them all). 
Returns 
------- 
Returns a tuple of (return codes, return values), each a list in order of the jobs provided. 
Issues 
------- 
Only tested on POSIX OSes. 
Examples 
-------- 
See the testParallel() method in this module 
    """ 
+1

"Si esto no funciona, tal vez las cosas que está devolviendo de sus funciones no sean seleccionables y, por lo tanto, no puedan pasar las colas correctamente". Enorme ayuda, tuve este problema exacto y no sabía que 'multiprocesamiento' se basa en decapado para pasar objetos entre procesos (incluidos los resultados de retorno). – Michael

+0

Solo una sugerencia, pero debe invertir un poco de tiempo en hacer esto legible.Probablemente hay algunas cosas realmente útiles aquí, pero es casi imposible de decir. –

+0

Sí, usamos esta función una tonelada, para gran efecto. Puede que no sepa cómo hacerlo legible, pero le daré otra oportunidad. Gracias. https://github.com/cpbl/cpblUtilities/issues/10 – CPBL

Cuestiones relacionadas