Tengo problemas al usar Pool.map_async() (y también Pool.map()) en el módulo de multiprocesamiento. He implementado una función paralela para bucle que funciona bien siempre que la entrada de función a Pool.map_async sea una función "regular". Cuando la función es, por ejemplo, un método de una clase, cuando me siento un PicklingError:PicklingError al usar multiprocesamiento
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
uso de Python sólo para la computación científica, así que no estoy muy familiarizado con el concepto de decapado, acaban de aprender un poco sobre él hoy. He visto un par de respuestas anteriores, como Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), pero no puedo encontrar la manera de hacerlo funcionar, incluso si sigo el enlace proporcionado en la respuesta.
Mi código, era el objetivo es simular un vector de r.v normales con el uso de múltiples núcleos. Tenga en cuenta que esto es solo un ejemplo y tal vez ni siquiera vale la pena ejecutar en múltiples núcleos.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
Siguiendo el enlace que aparece en la respuesta a la pregunta en Can't pickle <type 'instancemethod'> when using python's multiprocessing Pool.map(), Steven Bethard (casi al final) sugiere utilizar el módulo de copy_reg. Su código es:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
Realmente no entiendo cómo puedo hacer uso de esto. Lo único que se me ocurrió fue ponerlo justo antes de mi código, pero no ayudó. Una solución simple es, por supuesto, ir con la que funciona y evitar involucrarse con copy_reg. Estoy más interesado en hacer que copy_reg funcione correctamente para aprovechar al máximo el multiprocesamiento sin tener que resolver el problema cada vez.
Gracias por su ayuda, es muy apreciada.
Matias
Gracias por su respuesta. Tengo una pregunta y estaría muy agradecido si pudiera responder: 1. Usted dice: "(1) cambie su código para que pase una función, y no un método, a los procesos del trabajador, ... ". Esto es lo que estoy haciendo en mi segundo intento, es decir, con la función de prueba() ¿no? Mi pregunta es: si NO estoy aprobando una función, ¿cómo es que funciona? ¿Quieres decir que puedo encontrar errores en el futuro? Probé tu código y funcionó también, pero no veo el punto de "complicar" las cosas si mi primera alternativa ya funcionó. – matiasq
También me gustaría señalar que su alternativa (2) no funcionará para mí, porque mi principal problema es que la clase que estoy utilizando no es seleccionable. Estaba tratando de evitar esto usando copy_reg, que debería ser posible ya que Steve Bethard usó el segundo código que publiqué, y funcionó para él. De nuevo, muchas gracias por su tiempo. – matiasq
En cuanto a mi primera publicación, estaba equivocado. Escribí tu código, pero no tuvo ningún efecto ya que "if isinstance (func, types.MethodType):" nunca fue verdadero y, por lo tanto, el código no se ejecutó. Me disculpo por no haber notado esto antes. – matiasq