2012-06-24 17 views
19

estoy usando multiprocessing.imap_unordered para realizar un cálculo en una lista de valores:multiprocesamiento de Python y la memoria

def process_parallel(fnc, some_list): 
    pool = multiprocessing.Pool() 
    for result in pool.imap_unordered(fnc, some_list): 
     for x in result: 
      yield x 
    pool.terminate() 

Cada llamada a fnc devuelve un objeto enorme, como resultado, por diseño. Puedo almacenar N instancias de dicho objeto en la RAM, donde N ~ cpu_count, pero no mucho más (no cientos).

Ahora, el uso de esta función ocupa demasiada memoria. La memoria se gasta por completo en el proceso principal, no en los trabajadores.

¿Cómo almacena imap_unordered los resultados finales? Me refiero a los resultados que ya fueron devueltos por los trabajadores pero que aún no se han transmitido al usuario. Pensé que era inteligente y solo los calculé "perezosamente" según las necesidades, pero aparentemente no.

Parece que como no puedo consumir los resultados de process_parallel lo suficientemente rápido, la agrupación sigue haciendo cola estos objetos enormes desde fnc en algún lugar, internamente, y luego explota. Hay alguna manera de evitar esto? Limitar su cola interna de alguna manera?


Estoy usando Python2.7. Aclamaciones.

+0

Bueno, de lo que veo 'rendimiento' es en el proceso principal, no dentro de' fnc' (es decir, la función hecha por los trabajadores). ¿'fnc' está haciendo una evaluación perezosa? – Felix

+0

@FelixBonkoski No, 'fnc' toma un solo elemento de' some_list', y calcula y devuelve un gran objeto de él. – user124114

+0

Solo límite de velocidad en función de la memoria disponible. –

Respuesta

10

Como puede ver al buscar en el archivo fuente correspondiente (python2.7/multiprocessing/pool.py), IMapUnorderedIterator utiliza una instancia collections.deque para almacenar los resultados. Si aparece un nuevo elemento, se agrega y elimina en la iteración.

Como sugirió, si entra otro objeto enorme mientras el hilo principal aún está procesando el objeto, también se almacenarán en la memoria.

Lo que se podría probar es algo como esto:

it = pool.imap_unordered(fnc, some_list) 
for result in it: 
    it._cond.acquire() 
    for x in result: 
     yield x 
    it._cond.release() 

Esto debería hacer la tarea-resultado-receptor-hilo se bloquean mientras se procesa un artículo si está tratando de poner el siguiente objeto en el deque. Por lo tanto, no debe haber más de dos de los objetos grandes en la memoria. Si funciona para su caso, no lo sé;)

+0

No estoy siguiendo este ejemplo, ¿no es 'it' simplemente un generador y, como tal, no tendrá los métodos' _cond.acquire() 'y' release'? Si necesita escribirlos usted mismo, ¿qué tipo de objeto necesita '._cond'? – Hooked

+0

Parece que el usuario se preocupa por el rendimiento ¿por qué limitarlo a un número pequeño con un simple bloqueo? –

+0

@Hooked: 'imap_unordered' devuelve un' IMapUnorderedIterator', que tiene estas funciones como se puede ver con una mirada en el código fuente correspondiente. Dado que el hilo receptor-resultado (al recibir un resultado) requiere que el bloqueo ingrese el resultado en el deque, esto bloqueará el hilo y evitará que consuma más memoria. – rumpel

2

La solución más simple que puedo pensar sería añadir un cierre para envolver su función fnc que utilizaría un semáforo para controlar el número total de trabajo simultánea ejecuciones que pueden ejecutarse de una vez (supongo que el proceso/subproceso principal incrementaría el semáforo). El valor del semáforo se puede calcular en función del tamaño del trabajo y la memoria disponible.

Cuestiones relacionadas