2011-08-07 13 views
43

¿Sería posible crear un conjunto de python que no sea daemonic? Quiero que un grupo pueda llamar a una función que tiene otro grupo dentro. Gracias.Python Process Pool no daemonic?

+0

yo sepa, no, no es posible todo el trabajo en el grupo se endemoniada y no es posible __inject la dependencia__, por cierto, no entiendo la segunda parte de su pregunta 'Quiero que un grupo pueda llamar a una función que tiene otro grupo adentro' y cómo eso interfiere con el hecho de que los trabajadores son daemonizados. – mouad

+1

Porque si la función a tiene un grupo que ejecuta la función b que tiene un grupo que ejecuta la función c, existe un problema en b que se está ejecutando en un proceso de daemon, y los procesos de daemon no pueden crear procesos. 'AssertionError: los procesos daemonic no pueden tener hijos' – Max

Respuesta

68

La clase multiprocessing.pool.Pool crea los procesos de trabajo en su método __init__, los hace daemonic y les comienza, y no es posible volver a establecer su atributo daemon a False antes de que se ponen en marcha (y después no se permite más). Pero puede crear su propia subclase de multiprocesing.pool.Pool (multiprocessing.Pool es solo una función de envoltura) y sustituir su propia subclase multiprocessing.Process, que siempre es no-demoníaca, para ser utilizada en los procesos de trabajo.

Aquí hay un ejemplo completo de cómo hacer esto. Las partes importantes son las dos clases NoDaemonProcess y MyPool en la parte superior y para llamar al pool.close() y pool.join() en su instancia MyPool al final.

#!/usr/bin/env python 
# -*- coding: UTF-8 -*- 

import multiprocessing 
# We must import this explicitly, it is not imported by the top-level 
# multiprocessing module. 
import multiprocessing.pool 
import time 

from random import randint 


class NoDaemonProcess(multiprocessing.Process): 
    # make 'daemon' attribute always return False 
    def _get_daemon(self): 
     return False 
    def _set_daemon(self, value): 
     pass 
    daemon = property(_get_daemon, _set_daemon) 

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool 
# because the latter is only a wrapper function, not a proper class. 
class MyPool(multiprocessing.pool.Pool): 
    Process = NoDaemonProcess 

def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t 

def work(num_procs): 
    print("Creating %i (daemon) workers and jobs in child." % num_procs) 
    pool = multiprocessing.Pool(num_procs) 

    result = pool.map(sleepawhile, 
     [randint(1, 5) for x in range(num_procs)]) 

    # The following is not really needed, since the (daemon) workers of the 
    # child's pool are killed when the child is terminated, but it's good 
    # practice to cleanup after ourselves anyway. 
    pool.close() 
    pool.join() 
    return result 

def test(): 
    print("Creating 5 (non-daemon) workers and jobs in main process.") 
    pool = MyPool(5) 

    result = pool.map(work, [randint(1, 5) for x in range(5)]) 

    pool.close() 
    pool.join() 
    print(result) 

if __name__ == '__main__': 
    test() 
+0

El código anterior parece estar colgando para mí. Específicamente, parece colgarse en pool.close() dentro de work(). ¿Hay algo que me pierdo? –

+1

Acabo de probar mi código nuevamente con Python 2.7/3.2 (después de corregir las líneas de "impresión") en Linux y Python 2.6/2.7/3.2 OS X. Linux y Python 2.7/3.2 en OS X funcionan bien pero el código realmente cuelga con Python 2.6 en OS X (Lion). Esto parece ser un error en el módulo de multiprocesamiento, que se solucionó, pero en realidad no he comprobado el rastreador de errores. –

+0

Esto realmente debería arreglarse en el módulo de multiprocesamiento (una opción para trabajadores no demoníacos debería estar disponible). ¿Alguien sabe quién lo mantiene? –

6

El módulo multiprocessing tiene una interfaz agradable de usar piscinas con procesos o hilos. Dependiendo de su caso de uso actual, puede considerar usar multiprocessing.pool.ThreadPool para su Grupo externo, lo que dará como resultado los hilos (que permiten generar procesos desde dentro) en comparación con los procesos.

podría estar limitado por el GIL, pero en mi caso particular (I ensayaron ambos), el tiempo de inicio para los procesos de la exterior Pool como creado here ampliamente superado la solución con ThreadPool.


Es realmente fácil de intercambiar Processes para Threads. Lea más acerca de cómo usar una solución ThreadPoolhere o here.

0

El problema que encontré fue al tratar de importar globales entre los módulos, lo que hace que la línea ProcessPool() se evalúe varias veces.

globals.py

from processing    import Manager, Lock 
from pathos.multiprocessing import ProcessPool 
from pathos.threading  import ThreadPool 

class SingletonMeta(type): 
    def __new__(cls, name, bases, dict): 
     dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self 
     return super(SingletonMeta, cls).__new__(cls, name, bases, dict) 

    def __init__(cls, name, bases, dict): 
     super(SingletonMeta, cls).__init__(name, bases, dict) 
     cls.instance = None 

    def __call__(cls,*args,**kw): 
     if cls.instance is None: 
      cls.instance = super(SingletonMeta, cls).__call__(*args, **kw) 
     return cls.instance 

    def __deepcopy__(self, item): 
     return item.__class__.instance 

class Globals(object): 
    __metaclass__ = SingletonMeta 
    """  
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children 

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug  
    """ 
    def __init__(self): 
     print "%s::__init__()" % (self.__class__.__name__) 
     self.shared_manager  = Manager() 
     self.shared_process_pool = ProcessPool() 
     self.shared_thread_pool = ThreadPool() 
     self.shared_lock   = Lock()  # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin 

Entonces importar de forma segura desde otras partes de su código

from globals import Globals 
Globals().shared_manager  
Globals().shared_process_pool 
Globals().shared_thread_pool 
Globals().shared_lock