2012-03-31 39 views
9

En py2.6 +, el módulo multiprocessing ofrece una clase Pool, por lo que uno puede hacer:multiprocesamiento y recolección de basura

class Volatile(object): 
    def do_stuff(self, ...): 
     pool = multiprocessing.Pool() 
     return pool.imap(...) 

Sin embargo, con la implementación estándar de Python en 2.7.2, este enfoque conduce pronto a "IOError: [Errno 24] Demasiados archivos abiertos". Aparentemente, el objeto pool nunca se recolecta basura, por lo que sus procesos nunca finalizan, acumulando los descriptores que se abren internamente. Creo que esto debido a que las siguientes obras:

class Volatile(object): 
    def do_stuff(self, ...): 
     pool = multiprocessing.Pool() 
     result = pool.map(...) 
     pool.terminate() 
     return result 

me gustaría mantener el enfoque iterador "perezosa" de imap; ¿Cómo funciona el recolector de basura en ese caso? ¿Cómo arreglar el código?

+0

le puede dar una pista sobre lo que el '...' 'está dentro de su pool.map (...)'? – SingleNegationElimination

+0

Claro. '...' son operaciones de solo lectura pero con uso intensivo de CPU en las variables miembro del objeto 'Volatile'. Me gustaría que estos se ejecuten en paralelo, para mejorar el rendimiento. El objeto no está mutado durante la duración de 'do_stuff'. – user124114

Respuesta

8

Al final, terminé pasando la referencia pool alrededor y que termina de forma manual una vez que el pool.imap iterador se terminó:

class Volatile(object): 
    def do_stuff(self, ...): 
     pool = multiprocessing.Pool() 
     return pool, pool.imap(...) 

    def call_stuff(self): 
     pool, results = self.do_stuff() 
     for result in results: 
      # lazy evaluation of the imap 
     pool.terminate() 

en caso de que alguien se topa con esta solución en el futuro: el parámetro es chunksize muy importante en Pool.imap (a diferencia de llanura Pool.map, donde no importó). Lo configuro manualmente para que cada proceso reciba 1 + len(input)/len(pool) trabajos. Dejándolo en el valor predeterminado chunksize=1 me dio el mismo rendimiento que si no usara el procesamiento paralelo en absoluto ... mal.

Supongo que no hay un beneficio real al usar imap ordenado frente a map, simplemente me gustan mejor los iteradores.

+0

No, quiero decir que me gustan los iteradores. Cada generador es un iterador, por cierto. – user124114

+0

En mi caso, tengo que llamar a 'pool.terminate()' para obtener 'gc.collect()' después de que funcione. De lo contrario, python simplemente no generará los objetos a los que se hace referencia en el grupo, incluso con 'del pool' explícito. –

3

En python, básicamente no tienes garantía de cuándo se destruirán las cosas, y en este caso, así no es como se diseñan las agrupaciones de multiprocesamiento.

Lo correcto es compartir un grupo único a través de múltiples llamadas a la función. La forma más sencilla de hacerlo es almacenar la piscina como una clase (o, tal vez, la instancia) Variable:

class Dispatcher: 
    pool = multiprocessing.Pool() 
    def do_stuff(self, ...): 
     result = self.pool.map(...) 
     return result 
+0

No 'Pool()' tenedor internamente? ¿Cómo podría su solución "actualizar" el estado de los procesos generados cuando se llama realmente a 'do_stuff()'? (a diferencia de cuando se evalúa 'Dispatcher') Suena bastante complicado mantener todo sincronizado a mano con el proceso maestro. – user124114

+0

Almacenar un grupo como variable miembro está bien; No entiendo su problema con el estado: ¿qué estado desea compartir? Si desea que sus procesos compartan el mismo estado de intérprete, probablemente debería usar hilos en su lugar ... – James

+1

Gracias @Aturó. Los hilos no hacen mucho, debido a GIL. El estado que deseo compartir es el objeto al que se llamó 'do_stuff' (= operación costosa en un objeto masivo de solo lectura, no puede permitirse copiar). – user124114

2

De hecho, incluso cuando se eliminan todas las referencias del usuario al objeto pool, y no hay tareas están en el código de la cola, y se realiza toda la recolección de basura, entonces todavía los procesos permanecen zombis como inutilizables en el sistema operativo - además tenemos hilos de servicio 3 de zombies de Pool colgante (Python 2.7 y 3.4):

>>> del pool 
>>> gc.collect() 
0 
>>> gc.garbage 
[] 
>>> threading.enumerate() 
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>, 
<Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>] 

y más Pool() 's va a añadir más y más el proceso y el hilo de zombis ... que permanecen hasta que se termine el proceso principal .

Se requiere un empuje especial para detener dicha agrupación zombi - a través de su servicio de hilo _handle_workers:

>>> ths = threading.enumerate() 
>>> for th in ths: 
...  try: th.name, th._state, th._Thread__target 
...  except AttributeError: pass 
...  
('MainThread', 1, None) 
('Thread-8', 0, <function _handle_tasks at 0x01462A30>) 
('Thread-9', 0, <function _handle_results at 0x014629F0>) 
('Thread-7', 0, <function _handle_workers at 0x01462A70>) 
>>> ths[-1]._state = multiprocessing.pool.CLOSE # or TERMINATE 
>>> threading.enumerate() 
[<_MainThread(MainThread, started 5632)>] 
>>> 

que termina los otros hilos de servicio y también termina los procesos hijos.


creo que uno de los problemas es que hay un error de fugas de recursos en la biblioteca de Python, lo que podría ser fijado por derecho de uso de weakref 's.

El otro punto es que Pool creación & terminación es caro (incluyendo 3 hilos de servicio por la piscina sólo para la gestión!), Y hay USusually hay razón para tener mucho más procesos de trabajo que los núcleos de CPU (cargas de alta CPU) o más que un número limitado de acuerdo con otro recurso de limitación (por ejemplo, ancho de banda de red). Por lo tanto, es razonable tratar un conjunto más como un recurso global de aplicación singular (administrado opcionalmente por un tiempo de espera) en lugar de un objeto quicky simplemente mantenido por un cierre (o un terminate() - solución debido al error).

Por ejemplo:

try: 
    _unused = pool # reload safe global var 
except NameError: 
    pool = None 

def get_pool(): 
    global pool 
    if pool is None: 
     atexit.register(stop_pool) 
     pool = Pool(CPUCORES) 
    return pool 

def stop_pool(): 
    global pool 
    if pool: 
     pool.terminate() 
     pool = None