2012-01-10 12 views
136

Lamento no poder reproducir el error con un ejemplo más simple y mi código es demasiado complicado para publicar. Si ejecuto el programa en el shell de IPython en lugar del python normal, las cosas funcionan bien.Error de decapado por multiproceso de Python

He buscado algunas notas anteriores sobre este problema. Todos fueron causados ​​por el uso de la función pool to call definida dentro de una función de clase. Pero este no es el caso para mí.

Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib64/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Agradeceria cualquier ayuda.

ACTUALIZACIÓN: La función I pickle se define en el nivel superior del módulo. Aunque llama a una función que contiene una función anidada. es decir, f() llama a g() llama a h() que tiene una función anidada i(), y estoy llamando a pool.apply_async (f). f(), g(), h() están todos definidos en el nivel superior. Intenté un ejemplo más simple con este patrón y funciona bien.

+1

La respuesta de nivel superior/aceptado es buena, pero podría significar que necesita volver a estructurar el código, lo que podría ser doloroso. Recomendaría a cualquiera que tenga este problema que también lea las respuestas adicionales utilizando 'eneldo' y' pateado'. Sin embargo, no tuve suerte con ninguna de las soluciones al trabajar con vtkobjects :(Cualquiera ha logrado ejecutar el código python en el procesamiento paralelo vtkPolyData? – Chris

Respuesta

178

Aquí hay un list of what can be pickled. En particular, las funciones solo se pueden seleccionar si están definidas en el nivel superior de un módulo.

Este trozo de código:

import multiprocessing as mp 

class Foo(): 
    @staticmethod 
    def work(self): 
     pass 

pool = mp.Pool() 
foo = Foo() 
pool.apply_async(foo.work) 
pool.close() 
pool.join() 

produce un error casi idéntica a la que usted envió:

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

El problema es que los pool métodos todos utilizan un queue.Queue pasar tareas al procesos de trabajo. Todo lo que pasa por el queue.Queue debe ser seleccionable, y foo.work no es seleccionable ya que no está definido en el nivel superior del módulo.

Se puede fijar mediante la definición de una función en el nivel superior, que llama foo.work():

def work(foo): 
    foo.work() 

pool.apply_async(work,args=(foo,)) 

en cuenta que foo es pickable, ya Foo se define en el nivel superior y foo.__dict__ es estibables.

+0

Gracias por su respuesta. Actualicé mi pregunta. No creo que esa sea la causa, aunque – CodeNoob

+5

Para obtener un PicklingError, se debe poner algo en la cola que no se pueda recoger. Podría ser la función o sus argumentos. Para obtener más información sobre el problema, sugiero hacer una copia de su programa y comenzar a reducirlo, convirtiéndolo en Cada vez que vuelva a ejecutar el programa para ver si el problema persiste, cuando sea realmente simple, o bien habrá descubierto el problema usted mismo, o tendrá algo que puede publicar aquí. – unutbu

+1

También: si define una función en el nivel superior de un módulo, pero está decorada, luego la referencia será a la salida del decorador, y obtendrá este error de todos modos. – bobpoekert

1

¿Está pasando una serie nudosa de cuerdas por casualidad?

He tenido este mismo error exacto cuando paso una matriz que contiene una cadena vacía. Creo que puede ser debido a este error: http://projects.scipy.org/numpy/ticket/1658

13

He encontrado que también puedo generar exactamente esa salida de error en una pieza de código perfectamente funcional intentando usar el generador de perfiles en ella.

Tenga en cuenta que esto fue en Windows (donde el bifurcación es un poco menos elegante).

estaba corriendo:

python -m profile -o output.pstats <script> 

y encontró que la eliminación de la elaboración de perfiles elimina el error y la colocación de la elaboración de perfiles restauró. También me estaba volviendo loco porque sabía que el código solía funcionar. Estaba revisando para ver si algo había actualizado pool.py ... luego tuve una sensación de depresión y eliminé el perfil y eso fue todo.

Publicando aquí para los archivos en caso de que alguien más se encuentre con él.

+2

¡GUAU, gracias por mencionar! Me volvía loco durante la última hora más o menos; Intenté todo hasta un ejemplo muy simple: nada parecía funcionar. Pero también hice que el generador de perfiles se ejecutara a través de mi archivo por lotes :( – tim

+0

¡Esto es lo que me sucedió! –

50

Yo usaría pathos.multiprocesssing, en lugar de multiprocessing. pathos.multiprocessing es una horquilla de multiprocessing que usa dill. dill puede serializar casi cualquier cosa en python, por lo que puede enviar mucho más en paralelo. La horquilla pathos también tiene la capacidad de trabajar directamente con múltiples funciones de argumento, como lo necesita para los métodos de clase.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> p = Pool(4) 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> p.map(t.plus, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Foo(object): 
... @staticmethod 
... def work(self, x): 
...  return x+1 
... 
>>> f = Foo() 
>>> p.apipe(f.work, f, 100) 
<processing.pool.ApplyResult object at 0x10504f8d0> 
>>> res = _ 
>>> res.get() 
101 

Get pathos (y si se quiere, dill) aquí: https://github.com/uqfoundation

+2

funcionó bien.Para cualquier otra persona, instalé ambas bibliotecas a través de: 'sudo pip install git + https: // github.com/uqfoundation/dill.git @ master' y ' sudo pip install git + https: //github.com/ uqfoundation/pathos.git @ master' –

+3

@AlexanderMcFarlane No instalaría paquetes de python con 'sudo' (de fuentes externas como github especialmente). En cambio, recomendaría ejecutar: 'pip install --user git + ...' – Chris

+0

El uso de 'pip install pathos' no funciona tristemente y da este mensaje:' No se pudo encontrar una versión que satisfaga el requisito pp == 1.5 .7-pathos (de pathos) ' – xApple

15

Como han dicho otros multiprocessing sólo puede transferir Python se opone a los procesos de trabajo que puede ser decapada. Si no puede reorganizar su código como se describe en unutbu, puede usar las capacidades extendidas de decapado/desempañado de dill para transferir datos (especialmente datos de código) como se muestra a continuación.

Esta solución sólo requiere la instalación de dill y no otras bibliotecas como pathos:

import os 
from multiprocessing import Pool 

import dill 


def run_dill_encoded(payload): 
    fun, args = dill.loads(payload) 
    return fun(*args) 


def apply_async(pool, fun, args): 
    payload = dill.dumps((fun, args)) 
    return pool.apply_async(run_dill_encoded, (payload,)) 


if __name__ == "__main__": 

    pool = Pool(processes=5) 

    # asyn execution of lambda 
    jobs = [] 
    for i in range(10): 
     job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) 
     jobs.append(job) 

    for job in jobs: 
     print job.get() 
    print 

    # async execution of static method 

    class O(object): 

     @staticmethod 
     def calc(): 
      return os.getpid() 

    jobs = [] 
    for i in range(10): 
     job = apply_async(pool, O.calc,()) 
     jobs.append(job) 

    for job in jobs: 
     print job.get() 
+3

Soy el autor de 'eneldo' y 'pathos' ... y mientras tienes razón, ¿no es mucho más agradable y más limpio? y más flexible para usar también 'pathos' como en mi respuesta? O tal vez estoy un poco parcial ... –

+3

No estaba al tanto del estado de '' pathos'' en el momento de escribir y quería presentar una solución que está muy cerca de la respuesta. Ahora que he visto su solución, estoy de acuerdo en que este es el camino a seguir. – rocksportrocker

+0

Leí su solución y dije 'Doh ... Ni siquiera pensé en hacerlo así'. Así que fue genial. –

4

This solution requires only the installation of dill and no other libraries as pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),): 
    """ 
    Unpack dumped function as target function and call it with arguments. 

    :param (dumped_function, item, args, kwargs): 
     a tuple of dumped function and its arguments 
    :return: 
     result of target function 
    """ 
    target_function = dill.loads(dumped_function) 
    res = target_function(item, *args, **kwargs) 
    return res 


def pack_function_for_map(target_function, items, *args, **kwargs): 
    """ 
    Pack function and arguments to object that can be sent from one 
    multiprocessing.Process to another. The main problem is: 
     «multiprocessing.Pool.map*» or «apply*» 
     cannot use class methods or closures. 
    It solves this problem with «dill». 
    It works with target function as argument, dumps it («with dill») 
    and returns dumped function with arguments of target function. 
    For more performance we dump only target function itself 
    and don't dump its arguments. 
    How to use (pseudo-code): 

     ~>>> import multiprocessing 
     ~>>> images = [...] 
     ~>>> pool = multiprocessing.Pool(100500) 
     ~>>> features = pool.map(
     ~...  *pack_function_for_map(
     ~...   super(Extractor, self).extract_features, 
     ~...   images, 
     ~...   type='png' 
     ~...   **options, 
     ~... ) 
     ~...) 
     ~>>> 

    :param target_function: 
     function, that you want to execute like target_function(item, *args, **kwargs). 
    :param items: 
     list of items for map 
    :param args: 
     positional arguments for target_function(item, *args, **kwargs) 
    :param kwargs: 
     named arguments for target_function(item, *args, **kwargs) 
    :return: tuple(function_wrapper, dumped_items) 
     It returs a tuple with 
      * function wrapper, that unpack and call target function; 
      * list of packed target function and its' arguments. 
    """ 
    dumped_function = dill.dumps(target_function) 
    dumped_items = [(dumped_function, item, args, kwargs) for item in items] 
    return apply_packed_function_for_map, dumped_items 

También funciona para matrices numpy.

0
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Este error también aparecerá si tiene alguna función incorporada dentro del objeto modelo que se pasó al trabajo asincrónico.

Así que asegúrese de comprobar objetos modelo que se pasan no tiene funciones incorporadas. (En nuestro caso, estábamos usando la función FieldTracker() de django-model-utils dentro del modelo para rastrear un determinado campo). Aquí está el link al problema relevante de GitHub.

Cuestiones relacionadas