2011-08-10 9 views
26

Tengo problemas al usar Pool.map_async() (y también Pool.map()) en el módulo de multiprocesamiento. He implementado una función paralela para bucle que funciona bien siempre que la entrada de función a Pool.map_async sea una función "regular". Cuando la función es, por ejemplo, un método de una clase, cuando me siento un PicklingError:PicklingError al usar multiprocesamiento

cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

uso de Python sólo para la computación científica, así que no estoy muy familiarizado con el concepto de decapado, acaban de aprender un poco sobre él hoy. He visto un par de respuestas anteriores, como Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), pero no puedo encontrar la manera de hacerlo funcionar, incluso si sigo el enlace proporcionado en la respuesta.

Mi código, era el objetivo es simular un vector de r.v normales con el uso de múltiples núcleos. Tenga en cuenta que esto es solo un ejemplo y tal vez ni siquiera vale la pena ejecutar en múltiples núcleos.

import multiprocessing as mp 
import scipy as sp 
import scipy.stats as spstat 

def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None): 
    """ 
    Purpose: Evaluate function using Multiple cores. 

    Input: 
     func  - Function to evaluate in parallel 
     arg  - Array of arguments to evaluate func(arg) 
     static_arg - The "static" argument (if any), i.e. the variables that are  constant in the evaluation of func. 
     nWorkers - Number of Workers to process computations. 
    Output: 
     func(i, static_arg) for i in args. 

    """ 
    # Prepare arguments for func: Collect arguments with static argument (if any) 
    if static_arg != None: 
     arguments = [[arg] + static_arg for arg in list(args)] 
    else: 
     arguments = args 

    # Initialize workers 
    pool = mp.Pool(processes = nWorkers) 

    # Evaluate function 
    result = pool.map_async(func, arguments, chunksize = chunksize) 
    pool.close() 
    pool.join() 

    return sp.array(result.get()).flatten() 

# First test-function. Freeze location and scale for the Normal random variates generator. 
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled 
# so this will give an error. 
def genNorm(loc, scale): 
    def subfunc(a): 
     return spstat.norm.rvs(loc = loc, scale = scale, size = a) 
    return subfunc 

# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be 
# pickled 
def test(fargs): 
    x, a, b = fargs 
    return spstat.norm.rvs(size = x, loc = a, scale = b) 

# Try it out. 
N = 1000000 

# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each 
# element in the output vector. 
args1 = sp.ones(N) 
static_arg = [0, 1] # standarized normal. 

# This gives the PicklingError 
func = genNorm(*static_arg) 
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None) 

# This is OK: 
func = test 
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None) 

Siguiendo el enlace que aparece en la respuesta a la pregunta en Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), Steven Bethard (casi al final) sugiere utilizar el módulo de copy_reg. Su código es:

def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
     else: 
      break 
    return func.__get__(obj, cls) 

import copy_reg 
import types 

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) 

Realmente no entiendo cómo puedo hacer uso de esto. Lo único que se me ocurrió fue ponerlo justo antes de mi código, pero no ayudó. Una solución simple es, por supuesto, ir con la que funciona y evitar involucrarse con copy_reg. Estoy más interesado en hacer que copy_reg funcione correctamente para aprovechar al máximo el multiprocesamiento sin tener que resolver el problema cada vez.

Gracias por su ayuda, es muy apreciada.

Matias

Respuesta

19

El problema aquí es menos de la mensaje de error "salmuera" que conceptual: multiproceso hace bifurcar el código en "trabajadores" diferentes procesos con el fin de realizar su magia.

A continuación, envía datos desde y hacia los diferentes procesos serializando y deserializando los datos de manera uniforme (es decir, la parte que usa el pickle).

Cuando parte de los datos pasan de una función a otra, asume que existe una función con el mismo nombre en el proceso llamado, y (supongo) pasa el nombre de la función, como una cadena. Dado que las funciones son sin estado, el llamado proceso de trabajo simplemente llama a la misma función con los datos que ha recibido. (Las funciones de Python no se pueden serializar mediante pickle, por lo que solo se pasa la referencia entre el proceso maestro y el proceso de trabajo)

Cuando su función es un método en una instancia, aunque cuando codificamos python es muy parecido Lo mismo que una función, con una variable "automática" self, no es lo mismo por debajo. Porque las instancias (objetos) son con estado. Eso significa que el proceso de trabajo no tiene una copia del objeto que es el propietario del método que desea llamar en el otro lado.

No va a funcionar la manera de pasar su método como una función a la llamada map_async, ya que multiprocess solo usa una referencia de función, no la función real al pasarla.

Por lo tanto, debe (1) cambiar su código para pasar una función, y no un método, a los procesos de trabajo, convirtiendo los estados que el objeto mantiene en nuevos parámetros para llamar. (2) Crea una función "objetivo" para la llamada map_async que reconstruye el objeto necesario en el lado del proceso de trabajo y luego llama a la función dentro de ella. La mayoría de las clases sencillas en Python son seleccionables por sí mismas, por lo que podría pasar el objeto que es el propietario de la función en la llamada map_async, y la función "objetivo" llamaría al método apropiado en el lado del trabajador.

(2) puede sonar "difíciles", pero es probable que sea sólo algo como esto - a menos que la clase de su objeto no puede ser decapada:

import types 

def target(object, *args, **kw): 
    method_name = args[0] 
    return getattr(object, method_name)(*args[1:]) 
(...)  
#And add these 3 lines prior to your map_async call: 


    # Evaluate function 
    if isinstance (func, types.MethodType): 
     arguments.insert(0, func.__name__) 
     func = target 
    result = pool.map_async(func, arguments, chunksize = chunksize) 

* Nota: No he probado esta

+0

Gracias por su respuesta. Tengo una pregunta y estaría muy agradecido si pudiera responder: 1. Usted dice: "(1) cambie su código para que pase una función, y no un método, a los procesos del trabajador, ... ". Esto es lo que estoy haciendo en mi segundo intento, es decir, con la función de prueba() ¿no? Mi pregunta es: si NO estoy aprobando una función, ¿cómo es que funciona? ¿Quieres decir que puedo encontrar errores en el futuro? Probé tu código y funcionó también, pero no veo el punto de "complicar" las cosas si mi primera alternativa ya funcionó. – matiasq

+0

También me gustaría señalar que su alternativa (2) no funcionará para mí, porque mi principal problema es que la clase que estoy utilizando no es seleccionable. Estaba tratando de evitar esto usando copy_reg, que debería ser posible ya que Steve Bethard usó el segundo código que publiqué, y funcionó para él. De nuevo, muchas gracias por su tiempo. – matiasq

+0

En cuanto a mi primera publicación, estaba equivocado. Escribí tu código, pero no tuvo ningún efecto ya que "if isinstance (func, types.MethodType):" nunca fue verdadero y, por lo tanto, el código no se ejecutó. Me disculpo por no haber notado esto antes. – matiasq