2011-09-05 11 views
11

Tengo una matriz Numpy 256x256x256, en la que cada elemento es una matriz. Necesito hacer algunos cálculos en cada una de estas matrices, y quiero usar el módulo multiprocessing para acelerar las cosas.¿Combinar itertools y multiprocesamiento?

Los resultados de estos cálculos se deben almacenar en una matriz de 256x256x256 como el original, por lo que el resultado de la matriz en el elemento [i,j,k] en la matriz original se debe poner en el elemento de [i,j,k] de la nueva matriz.

Para hacer esto, quiero hacer una lista que podría escribirse en una forma pseudo-ish como [array[i,j,k], (i, j, k)] y pasarla a una función para ser "multiprocesado". Suponiendo que matrices es una lista de todas las matrices extraídos de la matriz original y myfunc es la función de hacer los cálculos, el código sería algo como esto:

import multiprocessing 
import numpy as np 
from itertools import izip 

def myfunc(finput): 
    # Do some calculations... 
    ... 

    # ... and return the result and the index: 
    return (result, finput[1]) 

# Make indices: 
inds = np.rollaxis(np.indices((256, 256, 256)), 0, 4).reshape(-1, 3) 

# Make function input from the matrices and the indices: 
finput = izip(matrices, inds) 

pool = multiprocessing.Pool() 
async_results = np.asarray(pool.map_async(myfunc, finput).get(999999)) 

Sin embargo, parece que map_async es en realidad la creación de este enorme finput -list primero: Mi CPU no está haciendo mucho, pero la memoria y el intercambio se consumen por completo en cuestión de segundos, lo que obviamente no es lo que quiero.

¿Hay alguna manera de pasar esta enorme lista a una función de multiprocesamiento sin la necesidad de crearla explícitamente primero? ¿O conoce otra forma de resolver este problema?

¡Muchas gracias! :-)

+1

Dado que está utilizando 'get()' en 'map_async()', probablemente no desee una operación * asincrónica * y debería usar 'Pool.map()' en su lugar. –

+0

Quizás no entiendo el problema correctamente, pero ¿ha considerado imap o imap_unordered? –

Respuesta

10

Todos los métodos multiprocessing.Pool.map* consumen iteradores completamente (demo code) tan pronto como se llame a la función. Para alimentar a los trozos de función mapa del iterador de un pedazo a la vez, utilizar grouper_nofill:

def grouper_nofill(n, iterable): 
    '''list(grouper_nofill(3, 'ABCDEFG')) --> [['A', 'B', 'C'], ['D', 'E', 'F'], ['G']] 
    ''' 
    it=iter(iterable) 
    def take(): 
     while 1: yield list(itertools.islice(it,n)) 
    return iter(take().next,[]) 

chunksize=256 
async_results=[] 
for finput in grouper_nofill(chunksize,itertools.izip(matrices, inds)): 
    async_results.extend(pool.map_async(myfunc, finput).get()) 
async_results=np.array(async_results) 

PS. El parámetro pool.map_asyncchunksize hace algo diferente: divide lo iterable en fragmentos, luego le da a cada fragmento un proceso de trabajo que llama al map(func,chunk). Esto puede dar al proceso de trabajo más datos para masticar si func(item) termina demasiado rápido, pero no ayuda en su situación ya que el iterador aún se consume completamente inmediatamente después de que se emite la llamada map_async.

+0

¡Muchas gracias! ¡Su solución parece funcionar! Como referencia, tuve que usar pool.map_async (myfunc, finput) .get (999999), ¡pero funciona! Sin embargo, todavía usa mucha memoria (por supuesto, depende del tamaño de trozos exacto), y Python no parece ser una recolección de basura durante la ejecución. ¿Alguna idea de por qué podría ser? – digitaldingo

+0

@digitaldingo: Hm, nada viene a la mente. Sería ideal si puede reducir gradualmente su código a [SSCCE] (http://sscce.org/) y publicarlo aquí. – unutbu

0

Pool.map_async() necesita saber la longitud del iterable para enviar el trabajo a varios trabajadores. Como izip no tiene __len__, primero convierte el iterable en una lista, lo que provoca el enorme uso de memoria que está experimentando.

Puede tratar de eludir esto creando su propio iterador de estilo izip con __len__.

+0

¿por qué necesita saber eso?¿Por qué no puede simplemente alimentar a todos los trabajadores ociosos y la espera? –

+0

@andrew - Las primeras líneas en 'map_async()' ('multiprocesamiento/pool.py') son en realidad' if not hasattr (iterable, '__len__'): iterable = list (iterable) '. Necesita conocer la longitud para crear una lista de salida suficientemente grande ya que se desconoce el orden de terminación de los trabajadores. –

+0

hmmm. podría construir eso dinámicamente, ¿no? Solo estoy pensando que esto podría plantearse como un problema. parece una solicitud válida. –

2

Me encontré con este problema también. en lugar de esto:

res = p.map(func, combinations(arr, select_n)) 

hacer

res = p.imap(func, combinations(arr, select_n)) 

IMAP no consumirlo!