2011-08-26 28 views
41

Investigué primero y no pude encontrar una respuesta a mi pregunta. Estoy tratando de ejecutar múltiples funciones en paralelo en Python.Python: ¿Cómo puedo ejecutar las funciones de Python en paralelo?

que tienen algo como esto:

files.py 

import common #common is a util class that handles all the IO stuff 

dir1 = 'C:\folder1' 
dir2 = 'C:\folder2' 
filename = 'test.txt' 
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] 

def func1(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir1) 
     c.getFiles(dir1) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir1) 
     c.getFiles(dir1) 

def func2(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir2) 
     c.getFiles(dir2) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir2) 
     c.getFiles(dir2) 

Quiero llamar func1 y func2 y hacer que se ejecuten al mismo tiempo. Las funciones no interactúan entre sí o en el mismo objeto. Ahora mismo tengo que esperar a que func1 termine antes de que comience func2. ? ¿Cómo hago algo, como a continuación:

process.py 

from files import func1, func2 

runBothFunc(func1(), func2()) 

Quiero ser capaz de crear dos directorios muy cerca al mismo tiempo porque cada minuto que estoy contando cómo se están creando muchos archivos. Si el directorio no está allí, perderá mi tiempo.

+1

Actualizando la pregunta – lmcadory

+1

Es posible que desee volver a diseñar esto; Si está contando la cantidad de archivos/carpetas por minuto, está creando una condición de carrera. ¿Qué tal si cada función actualiza un contador o utiliza un archivo de bloqueo para garantizar que el proceso periódico no actualice el recuento hasta que ambas funciones hayan terminado de ejecutarse? –

Respuesta

73

Puede usar threading o multiprocessing.

Debido a peculiarities of CPython, threading es poco probable que logre un verdadero paralelismo. Por esta razón, multiprocessing es generalmente una mejor apuesta.

Aquí es un ejemplo completo:

from multiprocessing import Process 

def func1(): 
    print 'func1: starting' 
    for i in xrange(10000000): pass 
    print 'func1: finishing' 

def func2(): 
    print 'func2: starting' 
    for i in xrange(10000000): pass 
    print 'func2: finishing' 

if __name__ == '__main__': 
    p1 = Process(target=func1) 
    p1.start() 
    p2 = Process(target=func2) 
    p2.start() 
    p1.join() 
    p2.join() 

La mecánica de partida/procesos de unión niño fácilmente se pueden encapsular en una función a lo largo de las líneas de su runBothFunc:

def runInParallel(*fns): 
    proc = [] 
    for fn in fns: 
    p = Process(target=fn) 
    p.start() 
    proc.append(p) 
    for p in proc: 
    p.join() 

runInParallel(func1, func2) 
+2

Utilicé su código pero las funciones aún no se iniciaron al mismo tiempo. – lmcadory

+2

@Lamar McAdory: Por favor, explique a qué se refiere exactamente con "al mismo tiempo", tal vez dando un ejemplo concreto de lo que hizo, lo que esperaba que sucediera y lo que realmente sucedió. – NPE

+3

@Lamar: nunca puede tener ninguna garantía de "exactamente el mismo momento" y pensar que puede simplemente es incorrecto. Dependiendo de la cantidad de CPU que tenga, la carga de la máquina, el tiempo de muchas cosas que suceden en la computadora, todos tendrán una influencia en el momento en que comiencen los hilos/proceso. Además, dado que los procesos se inician inmediatamente después de la creación, la sobrecarga de crear un proceso también debe calcularse en la diferencia de tiempo que ve. – Martin

3

No hay manera de garantiza que dos funciones se ejecutarán sincronizadas entre sí, lo que parece ser lo que quieres hacer.

Lo mejor que puede hacer es dividir la función en varios pasos, luego esperar a que ambos terminen en puntos de sincronización críticos usando Process.join como las menciones de respuesta de @ aix.

Esto es mejor que time.sleep(10) porque no puede garantizar los tiempos exactos. Con espera explícita, usted está diciendo que las funciones deben realizarse ejecutando ese paso antes de pasar al siguiente, en lugar de asumir que se realizará dentro de los 10 ms, lo que no está garantizado en función de qué más está sucediendo en la máquina.

3

Si usted es un usuario de Windows y utiliza Python 3, esta publicación lo ayudará a hacer programación paralela en python. Cuando ejecuta una programación de grupo de biblioteca de multiprocesamiento, obtendrá un error con respecto a la función principal en su programa . Esto se debe a que Windows no tiene funcionalidad fork(). La publicación de abajo está dando una solución al problema mencionado.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Desde que estaba usando la pitón 3, he cambiado el programa un poco como esto:

from types import FunctionType 
import marshal 

def _applicable(*args, **kwargs): 
    name = kwargs['__pw_name'] 
    code = marshal.loads(kwargs['__pw_code']) 
    gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls']) 
    defs = marshal.loads(kwargs['__pw_defs']) 
    clsr = marshal.loads(kwargs['__pw_clsr']) 
    fdct = marshal.loads(kwargs['__pw_fdct']) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    del kwargs['__pw_name'] 
    del kwargs['__pw_code'] 
    del kwargs['__pw_defs'] 
    del kwargs['__pw_clsr'] 
    del kwargs['__pw_fdct'] 
    return func(*args, **kwargs) 

def make_applicable(f, *args, **kwargs): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    kwargs['__pw_name'] = f.__name__ # edited 
    kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited 
    kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited 
    kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited 
    kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited 
    return _applicable, args, kwargs 

def _mappable(x): 
    x,name,code,defs,clsr,fdct = x 
    code = marshal.loads(code) 
    gbls = globals() #gbls = marshal.loads(gbls) 
    defs = marshal.loads(defs) 
    clsr = marshal.loads(clsr) 
    fdct = marshal.loads(fdct) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    return func(x) 

def make_mappable(f, iterable): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    name = f.__name__ # edited 
    code = marshal.dumps(f.__code__) # edited 
    defs = marshal.dumps(f.__defaults__) # edited 
    clsr = marshal.dumps(f.__closure__) # edited 
    fdct = marshal.dumps(f.__dict__) # edited 
    return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable) 

Después de esta función, el código de problema anterior también se cambia un poco como esto:

from multiprocessing import Pool 
from poolable import make_applicable, make_mappable 

def cube(x): 
    return x**3 

if __name__ == "__main__": 
    pool = Pool(processes=2) 
    results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)] 
    print([result.get(timeout=10) for result in results]) 

y me dieron el resultado que:

[1, 8, 27, 64, 125, 216] 

Estoy pensando que esta publicación puede ser útil para algunos de los usuarios de Windows.

Cuestiones relacionadas