2009-09-24 6 views
7

Entonces, tengo una aplicación que usa Twisted + Stomper como un cliente STOMP que cultiva trabajo en multiprocesamiento. Piscina de trabajadores.cliente de red twisted con multiprocesamiento de los trabajadores?

Esto parece funcionar bien cuando sólo tiene que utilizar un script en Python para disparar con esto, que (simplificado) se ve algo como esto:

# stompclient.py 

logging.config.fileConfig(config_path) 
logger = logging.getLogger(__name__) 

# Add observer to make Twisted log via python 
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool. (child processes get forked off immediately) 
pool = multiprocessing.Pool(processes=processes) 

StompClientFactory.username = username 
StompClientFactory.password = password 
StompClientFactory.destination = destination 
reactor.connectTCP(host, port, StompClientFactory()) 
reactor.run() 

Como esto está empaquetado para su despliegue, pensé que iba a aprovechar del script twistd y ejecútelo desde un archivo tac.

Aquí está mi archivo tac muy similares de aspecto:

# stompclient.tac 

logging.config.fileConfig(config_path) 
logger = logging.getLogger(__name__) 

# Add observer to make Twisted log via python 
twisted.python.log.PythonLoggingObserver().start() 

# initialize the process pool. (child processes get forked off immediately) 
pool = multiprocessing.Pool(processes=processes) 

StompClientFactory.username = username 
StompClientFactory.password = password 
StompClientFactory.destination = destination 

application = service.Application('myapp') 

service = internet.TCPClient(host, port, StompClientFactory()) 
service.setServiceParent(application) 

En aras de la ilustración, he derrumbado o cambiado algunos detalles; es de esperar que no fueran la esencia del problema. Por ejemplo, mi aplicación tiene un sistema de complementos, el grupo se inicializa mediante un método independiente y luego el trabajo se delega en el grupo utilizando pool.apply_async() pasando uno de los métodos de proceso() de mi complemento.

Por lo tanto, si ejecuto el script (stompclient.py), todo funciona como se esperaba.

También parece funcionar bien si me quedo giro en el modo de no-daemon (-n): sin embargo

twistd -noy stompclient.tac 

, no lo hace trabajo cuando corro en modo demonio:

twistd -oy stompclient.tac 

Parece que la aplicación se inicia correctamente, pero cuando intenta interrumpir el trabajo, simplemente se bloquea. Por "cuelga", quiero decir que parece que nunca se le pide al proceso hijo que haga nada y el padre (que se llama pool.apply_async()) simplemente se sienta allí esperando que la respuesta regrese.

Estoy seguro de que estoy haciendo algo estúpido con el multiprocesamiento Twisted +, pero realmente espero que alguien pueda explicarme el error en mi enfoque.

¡Gracias de antemano!

Respuesta

12

Como la diferencia entre su invocación de trabajo y su invocación no laborable es solo la opción "-n", parece más probable que el problema esté causado por el proceso de daemonización (que "-n" evita que ocurra).

En POSIX, uno de los pasos implicados en la daemonización es bifurcar y hacer que el padre salga. Entre otras cosas, esto tiene la consecuencia de que su código se ejecute en un proceso diferente al que se evaluó el archivo .tac. Esto también reorganiza la relación hijo/padre de los procesos que se iniciaron en el archivo .tac, como su conjunto de procesos de multiprocesamiento.

Los procesos del grupo de multiprocesamiento comienzan con uno de los padres del proceso twistd que inicia. Sin embargo, cuando ese proceso sale como parte de la daemonización, su padre se convierte en el proceso de inicio del sistema. Esto puede causar algunos problemas, aunque probablemente no sea el problema colgante que describió.Probablemente existan otros detalles de implementación de bajo nivel que normalmente permiten el funcionamiento del módulo de multiprocesamiento pero que se ven afectados por el proceso de daemonización.

Afortunadamente, evitar esta extraña interacción debería ser sencillo. Las API de servicio de Twisted te permiten ejecutar código después de que la daemonización se haya completado. Si usa estas API, puede retrasar la inicialización del grupo de procesos del módulo de multiprocesamiento hasta después de la daemonización y con suerte evitar el problema. Aquí está un ejemplo de lo que podría ser:

from twisted.application.service import Service 

class MultiprocessingService(Service): 
    def startService(self): 
     self.pool = multiprocessing.Pool(processes=processes) 

MultiprocessingService().setServiceParent(application) 

Ahora, por separado, también puede tener problemas relacionados con la limpieza de procesos hijo del módulo de multiprocesamiento, o, posiblemente, problemas con los procesos creados con la API de creación de procesos de trenzado, reactor.spawnProcess. Esto se debe a que una parte del tratamiento de los procesos secundarios correctamente generalmente implica el manejo de la señal SIGCHLD. Sin embargo, el trenzado y el multiprocesamiento no cooperarán en este sentido, por lo que a uno de ellos se le notificará que todos los niños saldrán y el otro nunca será notificado. Si no usas la API de Twisted para crear procesos secundarios, entonces esto puede ser bueno para ti, pero es posible que quieras verificar que cualquier controlador de señal que el módulo de multiprocesamiento intente instalar en realidad "gana" y no obtiene reemplazado por el propio controlador de Twisted.

+2

Esto fue * extremadamente * útil. ¡Gracias! –

0

Una posible idea para usted ...

Cuando se ejecuta en modo demonio twistd cerrará stdin, stdout y stderr. ¿Algo que tus clientes leen o escriben?

+0

Nada debería estar escrito para ellos (y mi logging irá al syslog) pero me pregunto si tal vez algún error de bajo nivel está intentando ir a stderr. Puede ser frustrante intentar depurar en silencio :) –

Cuestiones relacionadas