Quiero acelerar un problema embarazosamente paralelo relacionado con la Inferencia Bayesiana. El objetivo es inferir los coeficientes u para un conjunto de imágenes x, dada una matriz A, tal que X = A * U. X tiene dimensiones mxn, A mxp y U pxn. Para cada columna de X, se debe inferir la columna correspondiente óptima de los coeficientes U. Al final, esta información se usa para actualizar A. Utilizo m = 3000, p = 1500 yn = 100. Entonces, como es un modelo lineal, la inferencia de la matriz de coeficientes u consiste en n cálculos independientes. Por lo tanto, traté de trabajar con el módulo de multiprocesamiento de Python, pero no hay velocidad.¿Por qué no hay aceleración cuando se usa el multiprocesamiento de pitones para un problema embarazosamente paralelo dentro de un bucle for, con datos numpy compartidos?
Esto es lo que hice:
La estructura principal, sin paralelización, es:
import numpy as np
from convex import Crwlasso_cd
S = np.empty((m, batch_size))
for t in xrange(start_iter, niter):
## Begin Warm Start ##
# Take 5 gradient steps w/ this batch using last coef. to warm start inf.
for ws in range(5):
# Initialize the coefficients
if ws:
theta = U
else:
theta = np.dot(A.T, X)
# Infer the Coefficients for the given data batch X of size mxn (n=batch_size)
# Crwlasso_cd is the function that does the inference per data sample
# It's basically a C-inline code
for k in range(batch_size):
U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy())
# Given the inferred coefficients, update and renormalize
# the basis functions A
dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood
A += (eta/batch_size) * dA1
A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0))))
Implementación de multiprocesamiento:
Me trataron de poner en práctica el multiprocesamiento. Tengo una máquina de 8 núcleos que puedo usar.
- Hay 3 for-loops. La única que parece ser "paralelizable" es la tercera, donde se infieren los coeficientes:
- Generar una cola y apilar la iteración números de 0 a batch_size-1 en la cola de
- Generar 8 procesos, y dejar que ellos trabajan a través de la cola de
- Compartir los datos T utilizando multiprocessing.Array
por lo tanto, he sustituido este tercer bucle con lo siguiente:
from multiprocessing import Process, Queue
import multiprocessing as mp
from Queue import Empty
num_cpu = mp.cpu_count()
work_queue = Queue()
# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding
# ndarray
U = np.empty((p, batch_size))
U_mp = Wrap_mp(U)
...
# Within the for-loops:
for p in xrange(batch_size):
work_queue.put(p)
processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)]
for p in processes:
p.start()
print p.pid
for p in processes:
p.join()
Aquí es la clase Wrap_mp:
class Wrap_mp(object):
""" Wrapper around multiprocessing.Array to share an array across
processes. Store the array as a multiprocessing.Array, but compute with it
as a numpy.ndarray
"""
def __init__(self, arr):
""" Initialize a shared array from a numpy array.
The data is copied.
"""
self.data = ndarray_to_shmem(arr)
self.dtype = arr.dtype
self.shape = arr.shape
def __array__(self):
""" Implement the array protocole.
"""
arr = shmem_as_ndarray(self.data, dtype=self.dtype)
arr.shape = self.shape
return arr
def asarray(self):
return self.__array__()
Y aquí es la función infer_coefficients_mp:
def infer_feature_coefficients_mp(work_queue,U_mp,A,X):
while True:
try:
index = work_queue.get(block=False)
x = X[:,index]
U = U_mp.asarray()
theta = np.dot(phit,x)
# Infer the coefficients of the column index
U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy())
except Empty:
break
El problema ahora son los siguientes:
- La versión de multiprocesamiento no es más rápida que la versión de un solo subproceso para las dimensiones dadas de los datos.
- Los identificadores de proceso aumentan con cada iteración. ¿Esto significa que constantemente se genera un nuevo proceso? ¿Esto no genera una gran sobrecarga? ¿Cómo puedo evitar eso? ¿Existe la posibilidad de crear dentro de todo el for-loop 8 procesos diferentes y simplemente actualizarlos con los datos?
- ¿La forma en que comparto los coeficientes U entre los procesos ralentiza el cálculo? ¿Hay otra forma mejor de hacer esto?
- ¿Sería mejor un conjunto de procesos?
¡Estoy realmente agradecido por cualquier tipo de ayuda!Empecé a trabajar con Python hace un mes, y ahora estoy bastante perdido.
Engin
¿La planificación del trabajo realmente debe enviarse de a una por vez? ¿No sería justo programar múltiples unidades de trabajo por núcleo por adelantado? Sospecho que si se observan muy pocas mejoras en la agrupación de procesos, se gasta una gran cantidad de tiempo en contención de bloqueo dentro de la Cola de proceso. – cjhanks