2011-12-15 19 views
10

que estoy usando multiprocessing.Pool()multiprocessing.pool.map y la función con dos argumentos

Esto es lo que quiero Piscina:

def insert_and_process(file_to_process,db): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,file_list,db) # here having problem. 

quiero pasar 2 argumentos Lo que quiero hacer es Inicialice solo 4 conexiones DB (aquí intentaré crear una conexión en cada llamada a la función, posiblemente millones de ellas y causar IO Freezed hasta la muerte). si puedo crear 4 conexiones de db y 1 para cada proceso, estará bien.

¿Hay alguna solución para Pool? o debería abandonarlo?

EDIT:

De ayuda de los dos tengo esto al hacer esto:

args=zip(f,cycle(dbs)) 
Out[-]: 
[('f1', 'db1'), 
('f2', 'db2'), 
('f3', 'db3'), 
('f4', 'db4'), 
('f5', 'db1'), 
('f6', 'db2'), 
('f7', 'db3'), 
('f8', 'db4'), 
('f9', 'db1'), 
('f10', 'db2'), 
('f11', 'db3'), 
('f12', 'db4')] 

Así que aquí la forma en que va a funcionar, me va a mover el código de conexión DB a la planta principal y haz esto:

def process_and_insert(args): 

    #Table Definations 
    args[1].table.insert(**parse_file(args[0])) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 

    dbs = [DAL("path_to_mysql/database") for i in range(0,3)] 
    args=zip(file_list,cycle(dbs)) 
    P.map(insert_and_process,args) # here having problem. 

Sí, voy a probarlo y que sepas.

Respuesta

26

la documentación Pool no dice de una manera de pasar más de un parámetro a la función objetivo - he tratado de paso una secuencia, pero no consigue desplegada (un elemento de la secuencia para cada parámetro).

Sin embargo, usted puede escribir su función objetivo a esperar el primer (y único) parámetro a ser una tupla, en la que cada elemento es uno de los parámetros que usted está esperando:

from itertools import repeat 

def insert_and_process((file_to_process,db)): 
    db = DAL("path_to_mysql" + db) 
    #Table Definations 
    db.table.insert(**parse_file(file_to_process)) 
    return True 

if __name__=="__main__": 
    file_list=os.listdir(".") 
    P = Pool(processes=4) 
    P.map(insert_and_process,zip(file_list,repeat(db))) 

(tenga en cuenta el extra paréntesis en la definición de insert_and_process - Python lo trata como un único parámetro que debe ser una secuencia de 2 elementos. El primer elemento de la secuencia se atribuye a la primera variable y el otro a la segunda)

+4

Tenga en cuenta que la sintaxis 'def f ((arg1, arg2)):' ha desaparecido en Python 3. –

+1

@FerdinandBeyer: Lo había olvidado. Bueno, a menos que la implementación de multiprocesamiento.Pool.map sea diferente allí, el camino a seguir sería asignar a un único argumento y descomprimirlo dentro de la función. – jsbueno

+0

Gracias lo tengo trabajando! Lo obtuve haciendo zip (file_list, cycle (dbs)). Pero no uso f ((arg1, arg2)). como utilicé más código, ¡te seleccioné! –

8

Su grupo generará cuatro procesos, cada uno ejecutado por su propia instancia del intérprete de Python. Se puede utilizar una variable global para mantener su objeto de conexión de base de datos, de manera que exactamente una conexión se crea por proceso:

global_db = None 

def insert_and_process(file_to_process, db): 
    global global_db 
    if global_db is None: 
     # If this is the first time this function is called within this 
     # process, create a new connection. Otherwise, the global variable 
     # already holds a connection established by a former call. 
     global_db = DAL("path_to_mysql" + db) 
    global_db.table.insert(**parse_file(file_to_process)) 
    return True 

Desde Pool.map() y amigos sólo son compatibles con las funciones de los trabajadores de un argumento, es necesario crear un envoltorio que reenvía la obra:

def insert_and_process_helper(args): 
    return insert_and_process(*args) 

if __name__ == "__main__": 
    file_list=os.listdir(".") 
    db = "wherever you get your db" 
    # Create argument tuples for each function call: 
    jobs = [(file, db) for file in file_list] 
    P = Pool(processes=4) 
    P.map(insert_and_process_helper, jobs) 
+0

Gracias Ferdinand, esto es lo que quiero. Lo que quiero hacer es crear 4 Conexiones DB. Una conexión para cada proceso, pero no para todas las llamadas de función. 'DAL (" Path To db ")' creará una conexión db. La conexión única sería más lenta que las conexiones cuádruples a la vez. –

+0

He intentado esos ejemplos y funcionó bien cuando la función no tiene que regresar ...; ¿No podemos hacer algo como my_var = P.map (insert_and_process_helper, jobs)? – neverMind

+0

@neverMind, por supuesto, puede –

5

No es necesario usar zipSi por ejemplo usted tiene 2 parámetros, X e Y, y cada uno de ellos puede obtener varios valores, como:

X=range(1,6) 
Y=range(10) 

La función debe recibir sólo un parámetro, y descomprimirlo en el interior:

def func(params): 
    (x,y)=params 
    ... 

y usted lo llama así:

params = [(x,y) for x in X for y in Y] 
pool.map(func, params) 
2

Usando

params=[(x,y) for x in X for y in Y] 

se crea una copia completa de x y y, y que puede ser más lento que usar

from itertools import repeat 
P.map(insert_and_process,zip(file_list,repeat(db))) 
1

Puede utilizar la biblioteca

from functools import partial 

para este fin

como

func = partial(rdc, lat, lng) 
r = pool.map(func, range(8)) 

y

def rdc(lat,lng,x): 
    pass 
Cuestiones relacionadas