2012-04-01 13 views
19

Estoy usando el módulo multiprocessing de Python para procesar grandes matrices numpy en paralelo. Las matrices se mapean en memoria utilizando numpy.load(mmap_mode='r') en el proceso maestro. Después de eso, multiprocessing.Pool() bifurca el proceso (supongo).NumPy vs. multiprocesamiento y mmap

Todo parece funcionar bien, excepto que estoy recibiendo líneas como:

AttributeError ("objeto 'NoneType' no tiene atributo 'decirle'",) en <bound method memmap.__del__ of memmap([ 0.57735026, 0.57735026, 0.57735026, 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ], dtype=float32)> ignorado

en el Registros de prueba de unidad. Las pruebas pasan bien, sin embargo.

¿Alguna idea de lo que está pasando allí?

Usando Python 2.7.2, OS X, NumPy 1.6.1.


UPDATE:

Después de algún depuración, I perseguidos la causa a una ruta de código que se utiliza una (pequeña parte de) esta matriz numpy asignado en memoria como entrada a una llamada Pool.imap.

Aparentemente, el "problema" es con la forma en que multiprocessing.Pool.imap pasa su entrada a los nuevos procesos: usa pickle. Esto no funciona con mmap ed numpy arrays, y algo dentro de los saltos que conduce al error.

Encontré this reply de Robert Kern, que parece abordar el mismo problema. Sugiere crear una ruta de código especial para cuando la entrada imap proviene de una matriz mapeada en memoria: mapeo de memoria la misma matriz manualmente en el proceso engendrado.

Esto sería tan complicado y feo que preferiría vivir con el error y las copias de memoria adicionales. ¿Hay alguna otra manera que sería más ligero en la modificación del código existente?

Respuesta

22

Mi enfoque habitual (si puede vivir con copias de memoria adicionales) es hacer todo IO en un proceso y luego enviar cosas a un grupo de subprocesos de trabajo. Para cargar una porción de una matriz memmapped en la memoria, solo haga x = np.array(data[yourslice]) (data[yourslice].copy() en realidad no hace esto, lo que puede causar cierta confusión).

En primer lugar, vamos a generar algunos datos de prueba:

import numpy as np 
np.random.random(10000).tofile('data.dat') 

Puede reproducir sus errores con algo como esto:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield data[start:stop] 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

Y si sólo cambia a ceder np.array(data[start:stop]) En su lugar, arregle el problema:

import numpy as np 
import multiprocessing 

def main(): 
    data = np.memmap('data.dat', dtype=np.float, mode='r') 
    pool = multiprocessing.Pool() 
    results = pool.imap(calculation, chunks(data)) 
    results = np.fromiter(results, dtype=np.float) 

def chunks(data, chunksize=100): 
    """Overly-simple chunker...""" 
    intervals = range(0, data.size, chunksize) + [None] 
    for start, stop in zip(intervals[:-1], intervals[1:]): 
     yield np.array(data[start:stop]) 

def calculation(chunk): 
    """Dummy calculation.""" 
    return chunk.mean() - chunk.std() 

if __name__ == '__main__': 
    main() 

Por supuesto, esto hace un extra copia en memoria de cada fragmento.

A la larga, es probable que encuentre que es más fácil cambiar de archivos memmapped y pasar a algo como HDF. Esto es especialmente cierto si sus datos son multidimensionales. (Recomendaría h5py, pero pyTables es bueno si sus datos son "similares a los de una mesa".)

¡Buena suerte, en cualquier caso!

+0

Joe sus respuestas siempre rockean. Solo he estado tratando de descubrir algo como esto. – YXD

+0

Gracias por la punta HDF. Parece un gran cambio pero puede valer la pena, lo verificaré. – user124114