2010-12-07 10 views
15

Quiero acelerar un problema embarazosamente paralelo relacionado con la Inferencia Bayesiana. El objetivo es inferir los coeficientes u para un conjunto de imágenes x, dada una matriz A, tal que X = A * U. X tiene dimensiones mxn, A mxp y U pxn. Para cada columna de X, se debe inferir la columna correspondiente óptima de los coeficientes U. Al final, esta información se usa para actualizar A. Utilizo m = 3000, p = 1500 yn = 100. Entonces, como es un modelo lineal, la inferencia de la matriz de coeficientes u consiste en n cálculos independientes. Por lo tanto, traté de trabajar con el módulo de multiprocesamiento de Python, pero no hay velocidad.¿Por qué no hay aceleración cuando se usa el multiprocesamiento de pitones para un problema embarazosamente paralelo dentro de un bucle for, con datos numpy compartidos?

Esto es lo que hice:

La estructura principal, sin paralelización, es:

import numpy as np 
from convex import Crwlasso_cd 

S = np.empty((m, batch_size)) 

for t in xrange(start_iter, niter): 

    ## Begin Warm Start ## 
    # Take 5 gradient steps w/ this batch using last coef. to warm start inf. 
    for ws in range(5): 
     # Initialize the coefficients 
     if ws: 
      theta = U 
     else: 
      theta = np.dot(A.T, X) 

     # Infer the Coefficients for the given data batch X of size mxn (n=batch_size) 
     # Crwlasso_cd is the function that does the inference per data sample 
     # It's basically a C-inline code 
     for k in range(batch_size): 
      U[:,k] = Crwlasso_cd(X[:, k].copy(), A, theta=theta[:,k].copy()) 

     # Given the inferred coefficients, update and renormalize 
     # the basis functions A 
     dA1 = np.dot(X - np.dot(A, U), U.T) # Gaussian data likelihood 
     A += (eta/batch_size) * dA1 
     A = np.dot(A, np.diag(1/np.sqrt((A**2).sum(axis=0)))) 

Implementación de multiprocesamiento:

Me trataron de poner en práctica el multiprocesamiento. Tengo una máquina de 8 núcleos que puedo usar.

  1. Hay 3 for-loops. La única que parece ser "paralelizable" es la tercera, donde se infieren los coeficientes:
    • Generar una cola y apilar la iteración números de 0 a batch_size-1 en la cola de
    • Generar 8 procesos, y dejar que ellos trabajan a través de la cola de
  2. Compartir los datos T utilizando multiprocessing.Array

por lo tanto, he sustituido este tercer bucle con lo siguiente:

from multiprocessing import Process, Queue 
import multiprocessing as mp 
from Queue import Empty 

num_cpu = mp.cpu_count() 
work_queue = Queue() 

# Generate the empty ndarray U and a multiprocessing.Array-Wrapper U_mp around U 
# The class Wrap_mp is attached. Basically, U_mp.asarray() gives the corresponding 
# ndarray 
U = np.empty((p, batch_size)) 
U_mp = Wrap_mp(U) 

... 

     # Within the for-loops: 
     for p in xrange(batch_size): 
     work_queue.put(p) 

     processes = [Process(target=infer_coefficients_mp, args=(work_queue,U_mp,A,X)) for p in range(num_cpu)] 

     for p in processes: 
      p.start() 
      print p.pid 
     for p in processes: 
      p.join() 

Aquí es la clase Wrap_mp:

class Wrap_mp(object): 
""" Wrapper around multiprocessing.Array to share an array across 
    processes. Store the array as a multiprocessing.Array, but compute with it 
as a numpy.ndarray 
""" 

    def __init__(self, arr): 
     """ Initialize a shared array from a numpy array. 

      The data is copied. 
     """ 
     self.data = ndarray_to_shmem(arr) 
     self.dtype = arr.dtype 
     self.shape = arr.shape 

    def __array__(self): 
     """ Implement the array protocole. 
     """ 
     arr = shmem_as_ndarray(self.data, dtype=self.dtype) 
     arr.shape = self.shape 
     return arr 

    def asarray(self): 
     return self.__array__() 

Y aquí es la función infer_coefficients_mp:

def infer_feature_coefficients_mp(work_queue,U_mp,A,X): 

    while True: 
     try: 
      index = work_queue.get(block=False) 
      x = X[:,index] 
      U = U_mp.asarray() 
      theta = np.dot(phit,x) 

      # Infer the coefficients of the column index 
      U[:,index] = Crwlasso_cd(x.copy(), A, theta=theta.copy()) 

     except Empty: 
      break 

El problema ahora son los siguientes:

  1. La versión de multiprocesamiento no es más rápida que la versión de un solo subproceso para las dimensiones dadas de los datos.
  2. Los identificadores de proceso aumentan con cada iteración. ¿Esto significa que constantemente se genera un nuevo proceso? ¿Esto no genera una gran sobrecarga? ¿Cómo puedo evitar eso? ¿Existe la posibilidad de crear dentro de todo el for-loop 8 procesos diferentes y simplemente actualizarlos con los datos?
  3. ¿La forma en que comparto los coeficientes U entre los procesos ralentiza el cálculo? ¿Hay otra forma mejor de hacer esto?
  4. ¿Sería mejor un conjunto de procesos?

¡Estoy realmente agradecido por cualquier tipo de ayuda!Empecé a trabajar con Python hace un mes, y ahora estoy bastante perdido.

Engin

+0

¿La planificación del trabajo realmente debe enviarse de a una por vez? ¿No sería justo programar múltiples unidades de trabajo por núcleo por adelantado? Sospecho que si se observan muy pocas mejoras en la agrupación de procesos, se gasta una gran cantidad de tiempo en contención de bloqueo dentro de la Cola de proceso. – cjhanks

Respuesta

5

Cada vez que se crea un proceso que va a crear un nuevo proceso. Si está haciendo eso dentro de su bucle for, entonces sí, está iniciando nuevos procesos cada vez que pasa el ciclo. Parece que lo que quieres hacer es inicializar tu Cola y Procesos fuera del ciclo, luego llena la Cola dentro del ciclo.

He usado multiprocesamiento.Pool antes, y es útil, pero no ofrece mucho más de lo que ya ha implementado con una cola.

+0

¡Gracias por tu respuesta! Pero ¿cómo se pueden crear procesos fuera del ciclo, y luego simplemente actualizar las variables dadas y sincronizar los resultados sin p.join()? Porque la función de unión está cerrando el proceso, ¿verdad? –

+1

p.join simplemente espera que el proceso salga, lo que hará si llama a sys.exit o regresa de la función. Una vez que llame, comience en el proceso, se está ejecutando en paralelo con el proceso principal. Cuando los procesos del trabajador llamen work_queue.get(), bloquearán hasta que haya una entrada en la cola de trabajo para que ellos los lleven. Cuando llama a work_queue.put() en el proceso principal, alimenta el trabajo a la cola, y puede continuar haciéndolo hasta que finalice. –

+0

Por lo tanto, lo siento, pero para que quede claro: Inicializo los procesos fuera del ciclo principal con procesos = [Proceso (...) para ...], y dentro del ciclo, tan pronto como lo complete la cola por primera vez, inicie los procesos solo una vez, los dejo trabajar a través de la cola hasta que esté vacía. Luego, en la segunda iteración, llené la cola nuevamente. Ahora, los procesos funcionarán automáticamente a través de la cola, sin que tenga que activarlos de nuevo de ninguna manera, y ¿actualizarán los objetos de la memoria compartida? –

3

Eventualmente, todo esto se reduce a una pregunta: ¿es posible iniciar procesos fuera del bucle for principal, y para cada iteración, alimentar las variables actualizadas en ellos, hacer que procesen los datos, y recopilar los nuevos datos calculados de todos los procesos, sin tener que iniciar nuevos procesos en cada iteración?

Cuestiones relacionadas