2010-07-20 16 views
140

cuando corro algo como:Multiproceso: ¿Cómo se usa Pool.map en una función definida en una clase?

from multiprocessing import Pool 

p = Pool(5) 
def f(x): 
    return x*x 

p.map(f, [1,2,3]) 

funciona bien. Sin embargo, poner esto como una función de una clase:

class calculate(object): 
    def run(self): 
     def f(x): 
      return x*x 

     p = Pool() 
     return p.map(f, [1,2,3]) 

cl = calculate() 
print cl.run() 

me da el siguiente error:

Exception in thread Thread-1: 
Traceback (most recent call last): 
    File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner 
    self.run() 
    File "/sw/lib/python2.6/threading.py", line 484, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

que he visto un post de Alex Martelli tratar con el mismo tipo de problema, pero no fue lo suficientemente explícito

+1

"esto como una función de una clase"? ¿Puedes publicar el código que realmente obtiene el error real? Sin el código real, solo podemos adivinar lo que estás haciendo mal. –

+1

@ S.Lott publiqué el código – Mermoz

+0

Como observación general, existen módulos de decapado más potentes que el módulo pickle estándar de Python (como el [picloud] (https://pypi.python.org/pypi/cloud/2.7.2) módulo mencionado en [esta respuesta] (http://stackoverflow.com/a/16626757/2292832)). –

Respuesta

57

También me molestaron las restricciones sobre qué tipo de funciones podría aceptar pool.map. Escribí lo siguiente para eludir esto. Parece que funciona, incluso para el uso recursivo de parmap.

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(pipe,x): 
     pipe.send(f(x)) 
     pipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] 
    [p.start() for p in proc] 
    [p.join() for p in proc] 
    return [p.recv() for (p,c) in pipe] 

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5)) 
+1

Esto me ha funcionado muy bien, gracias. He encontrado una debilidad: intenté usar Parmap en algunas funciones que pasaron por un default y obtuve el PicklingError nuevamente. No encontré una solución para esto, simplemente reelaboré mi código para no usar el default. – sans

+2

Esto no funciona en Python 2.7.2 (predeterminado, 12 de junio de 2011, 15:08:59) [MSC v.1500 32 bit (Intel)] en win32 – ubershmekel

+3

Esto funciona en Python 2.7.3 Aug 1,2012 , 05:14:39. Esto no funciona en iterables gigantes -> causa un OSError: [Errno 24] Demasiados archivos abiertos debido a la cantidad de conductos que abre. –

7

Las funciones definidas en las clases (incluso dentro de las funciones dentro de las clases) realmente no saltan. Sin embargo, esto funciona:

def f(x): 
    return x*x 

class calculate(object): 
    def run(self): 
     p = Pool() 
    return p.map(f, [1,2,3]) 

cl = calculate() 
print cl.run() 
+12

gracias, pero me parece un poco sucio definir la función fuera de la clase. La clase debe agrupar todo lo que necesita para lograr una tarea determinada. – Mermoz

+3

@Memoz: "La clase debe agrupar todo lo que necesita" ¿En serio? No puedo encontrar muchos ejemplos de esto. La mayoría de las clases dependen de otras clases o funciones. ¿Por qué llamar a una dependencia de clase "sucia"? ¿Qué pasa con una dependencia? –

+0

Bueno, la función no debería modificar los datos de clase existentes, porque modificaría la versión en el otro proceso, por lo que podría ser un método estático. Puedes ordenar un método estático de la siguiente manera: http://stackoverflow.com/questions/1914261/pickling-a-staticmethod-in-python/1914798#1914798 O, por algo tan trivial, podrías usar un lambda. – robert

39

Actualmente no existe una solución a su problema, por lo que yo sé: la función que le dan a map() debe ser accesible a través de una importación de su módulo. Es por esto que funciona el código de Robert: la función f() se puede obtener mediante la importación el siguiente código:

def f(x): 
    return x*x 

class Calculate(object): 
    def run(self): 
     p = Pool() 
     return p.map(f, [1,2,3]) 

if __name__ == '__main__': 
    cl = Calculate() 
    print cl.run() 

De hecho, me añadió una sección de "principal", porque esto sigue la recommendations for the Windows platform ("Asegúrese de que el módulo principal puede ser importado de forma segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados ").

También agregué una letra mayúscula delante de Calculate, para seguir PEP 8. :)

12

También he tenido problemas con esto. Tenía funciones como miembros de datos de una clase, como un ejemplo simplificado:

from multiprocessing import Pool 
import itertools 
pool = Pool() 
class Example(object): 
    def __init__(self, my_add): 
     self.f = my_add 
    def add_lists(self, list1, list2): 
     # Needed to do something like this (the following line won't work) 
     return pool.map(self.f,list1,list2) 

que necesitaba usar el self.f función en una llamada Pool.map() desde dentro de la misma clase y no lo hice self.f tomar una tupla como argumento. Como esta función estaba incrustada en una clase, no me resultó claro cómo escribir el tipo de envoltorio que sugerían otras respuestas.

Resolví este problema usando un contenedor diferente que toma una tupla/lista, donde el primer elemento es la función, y los elementos restantes son los argumentos para esa función, llamada eval_func_tuple (f_args). Usando esto, la línea problemática puede ser reemplazada por return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aquí está el código completo:

del archivo: util.py

def add(a, b): return a+b 

def eval_func_tuple(f_args): 
    """Takes a tuple of a function and args, evaluates and returns result""" 
    return f_args[0](*f_args[1:]) 

del archivo: main.py

from multiprocessing import Pool 
import itertools 
import util 

pool = Pool() 
class Example(object): 
    def __init__(self, my_add): 
     self.f = my_add 
    def add_lists(self, list1, list2): 
     # The following line will now work 
     return pool.map(util.eval_func_tuple, 
      itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__': 
    myExample = Example(util.add) 
    list1 = [1, 2, 3] 
    list2 = [10, 20, 30] 
    print myExample.add_lists(list1, list2) 

Correr main.py dará [11, 22, 33]. Siéntase libre de mejorar esto, por ejemplo eval_func_tuple también podría ser modificado para tomar argumentos de palabra clave.

En otro aspecto, en otras respuestas, la función "parmap" se puede hacer más eficiente para el caso de más procesos que la cantidad de CPU disponibles. Estoy copiando una versión editada a continuación. Esta es mi primera publicación y no estaba seguro de si debería editar directamente la respuesta original. También cambié el nombre de algunas variables.

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(pipe,x): 
     pipe.send(f(x)) 
     pipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] 
    numProcesses = len(processes) 
    processNum = 0 
    outputList = [] 
    while processNum < numProcesses: 
     endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses) 
     for proc in processes[processNum:endProcessNum]: 
      proc.start() 
     for proc in processes[processNum:endProcessNum]: 
      proc.join() 
     for proc,c in pipe[processNum:endProcessNum]: 
      outputList.append(proc.recv()) 
     processNum = endProcessNum 
    return outputList  

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5))   
17

La solución por mrule es correcta, pero tiene un error: si el niño devuelve una gran cantidad de datos, se puede llenar el buffer de la tubería, el bloqueo en el niño de pipe.send(), mientras que el padre está a la espera para el niño para salir en pipe.join(). La solución es leer los datos del niño antes de join() ing niño. Además, el niño debe cerrar el extremo del tubo de los padres para evitar un punto muerto. El código a continuación arregla eso. También tenga en cuenta que este parmap crea un proceso por elemento en X. Una solución más avanzada es usar multiprocessing.cpu_count() para dividir X en varios trozos, y luego combinar los resultados antes de regresar. Dejo eso como un ejercicio para el lector para no estropear la concisión de la buena respuesta por mrule. ;)

from multiprocessing import Process, Pipe 
from itertools import izip 

def spawn(f): 
    def fun(ppipe, cpipe,x): 
     ppipe.close() 
     cpipe.send(f(x)) 
     cpipe.close() 
    return fun 

def parmap(f,X): 
    pipe=[Pipe() for x in X] 
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)] 
    [p.start() for p in proc] 
    ret = [p.recv() for (p,c) in pipe] 
    [p.join() for p in proc] 
    return ret 

if __name__ == '__main__': 
    print parmap(lambda x:x**x,range(1,5)) 
+0

¿Cómo eliges la cantidad de procesos? –

+0

¡FUNCIONA! Gracias. Ninguna de las otras soluciones funcionó para mí: D –

+0

Sin embargo, muere bastante rápido debido al error 'OSError: [Errno 24] Demasiados archivos abiertos'. Creo que debe haber algún tipo de límite en la cantidad de procesos para que funcione correctamente ... –

69

que no podía usar los códigos publicados hasta el momento debido a que los códigos que utilizan "multiprocessing.Pool" no funcionan con las expresiones lambda y los códigos no se utilice "multiprocessing.Pool" generar la mayor cantidad de procesos, ya que hay artículos de trabajo.

He adaptado el código s.t. genera una cantidad predefinida de trabajadores y solo itera a través de la lista de entrada si existe un trabajador inactivo. También habilité el modo "daemon" para los trabajadores a la vez. ctrl-c funciona como se esperaba.

import multiprocessing 


def fun(f, q_in, q_out): 
    while True: 
     i, x = q_in.get() 
     if i is None: 
      break 
     q_out.put((i, f(x))) 


def parmap(f, X, nprocs=multiprocessing.cpu_count()): 
    q_in = multiprocessing.Queue(1) 
    q_out = multiprocessing.Queue() 

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out)) 
      for _ in range(nprocs)] 
    for p in proc: 
     p.daemon = True 
     p.start() 

    sent = [q_in.put((i, x)) for i, x in enumerate(X)] 
    [q_in.put((None, None)) for _ in range(nprocs)] 
    res = [q_out.get() for _ in range(len(sent))] 

    [p.join() for p in proc] 

    return [x for i, x in sorted(res)] 


if __name__ == '__main__': 
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8])) 
+2

¿Cómo obtendría una barra de progreso para trabajar adecuadamente con esta función 'parmap'? – shockburner

+2

Una pregunta: utilicé esta solución, pero noté que los procesos de python que engendré permanecían activos en la memoria. ¿Alguna idea rápida sobre cómo matarlos cuando sale su parmap? – CompEcon

+1

@ klaus-se Sé que estamos desanimados de solo decir gracias en los comentarios, pero su respuesta es demasiado valiosa para mí, no pude resistirme. Desearía poder darte más que una sola reputación ... – deshtop

34

El multiprocesamiento y decapado está roto y limitado a menos que salte fuera de la biblioteca estándar.

Si utiliza un tenedor de multiprocessing llamado pathos.multiprocesssing, puede usar directamente clases y métodos de clase en las funciones de multiprocesamiento map. Esto se debe a que se usa dill en lugar de pickle o cPickle, y dill puede serializar casi cualquier cosa en python.

pathos.multiprocessing también proporciona una función de mapa asíncrono ... y puede map funciones con múltiples argumentos (por ejemplo map(math.pow, [1,2,3], [4,5,6]))

Ver las discusiones: What can multiprocessing and dill do together?

y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Incluso se encarga de la código que escribió inicialmente, sin modificaciones, y del intérprete. ¿Por qué hacer algo más que sea más frágil y específico para un solo caso?

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> class calculate(object): 
... def run(self): 
... def f(x): 
... return x*x 
... p = Pool() 
... return p.map(f, [1,2,3]) 
... 
>>> cl = calculate() 
>>> print cl.run() 
[1, 4, 9] 

Obtener el código aquí: https://github.com/uqfoundation/pathos

Y, sólo para mostrar un poco más de lo que puede hacer:

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> 
>>> p = Pool(4) 
>>> 
>>> def add(x,y): 
... return x+y 
... 
>>> x = [0,1,2,3] 
>>> y = [4,5,6,7] 
>>> 
>>> p.map(add, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> 
>>> p.map(Test.plus, [t]*4, x, y) 
[4, 6, 8, 10] 
>>> 
>>> res = p.amap(t.plus, x, y) 
>>> res.get() 
[4, 6, 8, 10] 
+0

pathos.multiprocessing también tiene un mapa asincrónico ('amap') que permite el uso de barras de progreso y otra programación asincrónica. –

+0

Me gusta pathos.multiprocessing, que puede servir casi como un reemplazo directo del mapa no paralelo mientras se disfruta del multiprocesamiento. Tengo un contenedor simple de pathos.multiprocessing.map, de modo que es más eficiente en cuanto a la memoria cuando se procesa una gran estructura de datos de solo lectura en varios núcleos, consulte [este repositorio de git] (https://github.com/fashandge/ python_parmap). – Fashandge

+0

Parece interesante, pero no se instala. Este es el mensaje que pip da: 'No se pudo encontrar una versión que satisfaga el requisito pp == 1.5.7-pathos (from pathos)' – xApple

3

he modificado el método de Klaus en sí, porque mientras estaba trabajando para mí con listas pequeñas, se colgaría cuando el número de artículos fuera ~ 1000 o más.En lugar de presionar los trabajos de uno en uno con la condición de detención None, cargo la cola de entrada de una sola vez y dejo que los procesos la mastiquen hasta que esté vacía.

from multiprocessing import cpu_count, Queue, Process 

def apply_func(f, q_in, q_out): 
    while not q_in.empty(): 
     i, x = q_in.get() 
     q_out.put((i, f(x))) 

# map a function using a pool of processes 
def parmap(f, X, nprocs = cpu_count()): 
    q_in, q_out = Queue(), Queue() 
    proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)] 
    sent = [q_in.put((i, x)) for i, x in enumerate(X)] 
    [p.start() for p in proc] 
    res = [q_out.get() for _ in sent] 
    [p.join() for p in proc] 

    return [x for i,x in sorted(res)] 

Editar: por desgracia ahora estoy ejecutando en este error en mi sistema: Multiprocessing Queue maxsize limit is 32767, esperemos que las soluciones no ayudan.

5

Tomé la respuesta de klaus se's y aganders3, e hice un módulo documentado que es más legible y contiene en un solo archivo. Simplemente puede agregarlo a su proyecto. ¡Incluso tiene una barra de progreso opcional!

""" 
The ``processes`` module provides some convenience functions 
for using parallel processes in python. 

Adapted from http://stackoverflow.com/a/16071616/287297 

Example usage: 

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True) 

Comments: 

"It spawns a predefined amount of workers and only iterates through the input list 
if there exists an idle worker. I also enabled the "daemon" mode for the workers so 
that KeyboardInterupt works as expected." 

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined. 

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess 
""" 

# Modules # 
import multiprocessing 
from tqdm import tqdm 

################################################################################ 
def apply_function(func_to_apply, queue_in, queue_out): 
    while not queue_in.empty(): 
     num, obj = queue_in.get() 
     queue_out.put((num, func_to_apply(obj))) 

################################################################################ 
def prll_map(func_to_apply, items, cpus=None, verbose=False): 
    # Number of processes to use # 
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32) 
    # Create queues # 
    q_in = multiprocessing.Queue() 
    q_out = multiprocessing.Queue() 
    # Process list # 
    new_proc = lambda t,a: multiprocessing.Process(target=t, args=a) 
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)] 
    # Put all the items (objects) in the queue # 
    sent = [q_in.put((i, x)) for i, x in enumerate(items)] 
    # Start them all # 
    for proc in processes: 
     proc.daemon = True 
     proc.start() 
    # Display progress bar or not # 
    if verbose: 
     results = [q_out.get() for x in tqdm(range(len(sent)))] 
    else: 
     results = [q_out.get() for x in range(len(sent))] 
    # Wait for them to finish # 
    for proc in processes: proc.join() 
    # Return results # 
    return [x for i, x in sorted(results)] 

################################################################################ 
def test(): 
    def slow_square(x): 
     import time 
     time.sleep(2) 
     return x**2 
    objs = range(20) 
    squares = prll_map(slow_square, objs, 4, verbose=True) 
    print "Result: %s" % squares 

EDITAR: Añadido @ sugerencia de Alexander-McFarlane y una función de prueba

+0

un problema con su barra de progreso ... La barra solo mide qué tan ineficientemente se dividió la carga de trabajo entre los procesadores. Si la carga de trabajo está perfectamente dividida, todos los procesadores 'se unirán()' al mismo tiempo y obtendrás un flash de '100%' en la pantalla 'tqdm'. El único momento en que será útil es si cada procesador tiene una carga de trabajo sesgada –

+1

mueve 'tqdm()' para ajustar la línea: 'resultado = [q_out.get() para _ en tqdm (enviado)]' y funciona mucho mejor - gran esfuerzo aunque realmente aprecio esto así que +1 –

+0

Gracias por ese consejo, lo intentaré y luego actualizaré la respuesta! – xApple

0

No estoy seguro de si este enfoque se ha adoptado sino un trabajo en torno a que estoy usando es:

from multiprocessing import Pool 

t = None 

def run(n): 
    return t.f(n) 

class Test(object): 
    def __init__(self, number): 
     self.number = number 

    def f(self, x): 
     print x * self.number 

    def pool(self): 
     pool = Pool(2) 
     pool.map(run, range(10)) 

if __name__ == '__main__': 
    t = Test(9) 
    t.pool() 
    pool = Pool(2) 
    pool.map(run, range(10)) 

de salida debe ser:

0 
9 
18 
27 
36 
45 
54 
63 
72 
81 
0 
9 
18 
27 
36 
45 
54 
63 
72 
81 
0
class Calculate(object): 
    # Your instance method to be executed 
    def f(self, x, y): 
    return x*y 

if __name__ == '__main__': 
    inp_list = [1,2,3] 
    y = 2 
    cal_obj = Calculate() 
    pool = Pool(2) 
    results = pool.map(lambda x: cal_obj.f(x, y), inp_list) 

Existe la posibilidad de que desee aplicar esta función para cada instancia diferente de la clase. Entonces aquí es la solución para eso también

class Calculate(object): 
    # Your instance method to be executed 
    def __init__(self, x): 
    self.x = x 

    def f(self, y): 
    return self.x*y 

if __name__ == '__main__': 
    inp_list = [Calculate(i) for i in range(3)] 
    y = 2 
    pool = Pool(2) 
    results = pool.map(lambda x: x.f(y), inp_list) 
3

Sé que esto se le pidió hace más de 6 años, pero sólo quería añadir mi solución, ya que algunas de las sugerencias anteriores parecen terriblemente complicado, pero mi solución era realmente muy sencillo.

Todo lo que tuve que hacer fue ajustar la llamada de pool.map() a una función de ayuda. Pasar el objeto de clase junto con args para el método como una tupla, que se parecía un poco a esto.

def run_in_parallel(args): 
    return args[0].method(args[1]) 

myclass = MyClass() 
method_args = [1,2,3,4,5,6] 
args_map = [ (myclass, arg) for arg in method_args ] 
pool = Pool() 
pool.map(run_in_parallel, args_map) 
0

Aquí está mi solución, que creo que es un poco menos hackish que la mayoría de los otros aquí. Es similar a la respuesta de nightowl.

someclasses = [MyClass(), MyClass(), MyClass()] 

def method_caller(some_object, some_method='the method'): 
    return getattr(some_object, some_method)() 

othermethod = partial(method_caller, some_method='othermethod') 

with Pool(6) as pool: 
    result = pool.map(othermethod, someclasses) 
Cuestiones relacionadas