2011-09-06 7 views
5

Tengo una aplicación GUI que tiene que ir a buscar y analizar los diversos recursos de la red al lado del bucle principal de la GUI. He buscado opciones utilizando el módulo de multiprocesamiento pitón ya que estos fetch acciones no sólo contienen el bloqueo de IO, pero también contienen análisis pesado, por lo multiprocesamiento puede ser mejor opción aquí que las roscas de pitón. Sería fácil usando Twisted Twisted pero esta vez no es una opción.llamada asincrónica dentro de una aplicación GUI mediante multiprocesamiento

he encontrado una solución simple aquí:

Python subprocess: callback when cmd exits

El problema es que la devolución de llamada mágicamente no se llama dentro de la TrenzadoPrincipal.

Así llego a tener la siguiente solución:

delegate.py

import os 
import multiprocessing as mp 
import signal 
from collections import namedtuple 
import uuid 
import logging 


_CALLBACKS = {} 
_QUEUE = mp.Queue() 

info = logging.getLogger(__name__).info 


class Call(namedtuple('Call', 'id finished result error')): 

    def attach(self, func): 
     if not self.finished: 
      _CALLBACKS.setdefault(self.id, []).append(func) 
     else: 
      func(self.result or self.error) 

     return self 

    def callback(self): 
     assert self.finished, 'Call not finished yet' 
     r = self.result or self.error 
     for func in _CALLBACKS.pop(self.id, []): 
      func(r) 

    def done(self, result=None, error=None): 
     assert not self.finished, 'Call already finished' 
     return self._replace(finished=(-1 if error else 1), 
      result=result, error=error) 

    @classmethod 
    def create(clss): 
     call = clss(uuid.uuid4().hex, 0, None, None) # uuid ??? 
     return call 

def run(q, cb, func, args=None, kwargs=None): 
    info('run: try running %s' % func) 
    try: 
     cb = cb.done(result=func(*(args or()), **(kwargs or {}))) 
    except Exception, err: 
     cb = cb.done(error=err) 
    q.put(cb) 
    os.kill(os.getppid(), signal.SIGUSR2) # SIGUSR2 ??? 
    info('run: leaving') 

def on_callback(sig, frame): 
    info('on_callback: checking queue ...') 
    c = _QUEUE.get(True, 2) 
    info('on_callback: got call - %s' % repr(c)) 
    c.callback() 

signal.signal(signal.SIGUSR2, on_callback) # SIGUSR2 ??? 

def delegate(func, *args, **kwargs): 
    info('delegate: %s %s' % (func, args,)) 
    cb = Call.create() 
    mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start() 
    return cb 


__all__ = ['delegate'] 

uso

from delegate import delegate 

def sleeper(secs): 
    assert secs >= 1, 'I need my Augenpflege' 
    info('sleeper: will go to sleep for %s secs' % secs) 
    sleep(secs) 
    info('sleeper: woke up - returning result') 
    return ['sleeper', 'result'] 

def on_sleeper_result(r): 
    if isinstance(r, Exception): 
     info('on_sleeper_result: got error: %s' % r) 
    else: 
     info('on_sleeper_result: got result: %s' % r) 

from delegate import delegate 
delegate(sleeper, 3).attach(on_sleeper_result) 
delegate(sleeper, -3).attach(on_sleeper_result) 
while 1: 
    info('main: loop') 
    sleep(1) 

salida

0122 08432 MainThread INFO delegate: <function sleeper at 0x163e320> (3,) 
MainThread INFO delegate: <function sleeper at 0x163e320> (-3,) 
0124 08437 MainThread INFO run: try running <function sleeper at 0x163e320> 
0124 08437 MainThread INFO sleeper: will go to sleep for 3 secs 
0124 08432 MainThread INFO main: loop 
0125 08438 MainThread INFO run: try running <function sleeper at 0x163e320> 
0126 08438 MainThread INFO run: leaving 
0126 08432 MainThread INFO on_callback: checking queue ... 
0126 08432 MainThread INFO on_callback: got call - Call(id='057649cba7d840e3825aa5ac73248f78', finished=-1, result=None, error=AssertionError('I need my Augenpflege',)) 
0127 08432 MainThread INFO on_sleeper_result: got error: I need my Augenpflege 
0127 08432 MainThread INFO main: loop 
1128 08432 MainThread INFO main: loop 
2129 08432 MainThread INFO main: loop 
3127 08437 MainThread INFO sleeper: woke up - returning result 
3128 08437 MainThread INFO run: leaving 
3128 08432 MainThread INFO on_callback: checking queue ... 
3129 08432 MainThread INFO on_callback: got call - Call(id='041420c6c83a489aa5c7409c662d4917', finished=1, result=['sleeper', 'result'], error=None) 
3129 08432 MainThread INFO on_sleeper_result: got result: ['sleeper', 'result'] 
3129 08432 MainThread INFO main: loop 
4130 08432 MainThread INFO main: loop 
5132 08432 MainThread INFO main: loop 
... 

Hasta ahora esto funciona bastante bien, pero mi experiencia ce con el módulo de multiprocesamiento son moderados y yo soy un poco inseguro si esto va a funcionar sin efectos. Mi pregunta es - ¿cuáles son las cosas que debería especial cuidado de durante el uso de multiprocesamiento de tal manera ... o hay patrones 'más correctas' para un mecanismo de devolución de llamada asincrónica utilizando el estándar de Python lib?

+0

Eli Bendersky escribió algo al respecto: [link] (http://eli.thegreenplace.net/2011/05/26/code-sample-socket-client-based-on-twisted-with-pyqt /) – JBernardo

+0

Leí esa publicación de blog antes, pero como mencioné Twisted no es una opción para este proyecto, gracias de todos modos – hooblei

Respuesta

2

no hay razón para usar señales (de bajo nivel API) para multiprocesamiento pitón y espera ocupada en el bucle principal.

Usted tiene que ejecutar su ciclo de eventos (modificada) en un QThread, que se puede llamar directamente código QT, o utilizar QApplication.postEvent (o pyqtSignal) para ejecutarlo en el hilo principal

# this should be in the delegate module 
while 1: 
    c = _QUEUE.get(True) # no timeout 
    c.callback() # or post event to main thread 

Puede también vea this page para la discusión sobre la comunicación entre hilos en qt

+0

Aaah - eso es todo - metí todo el código de delegado en un QThread (o mejor QRunnable + QThreadPool) y funciona como un amuleto - gracias – hooblei

1

Su código funciona, pero no es tan simple como podría ser. Avancemos el código.

Esto crea una instancia Call en el proceso principal:

def delegate(func, *args, **kwargs): 
    cb = Call.create() 

pero cuando se pasa cb al proceso de trabajo,

mp.Process(target=run, args=(_QUEUE, cb, func, args, kwargs,)).start() 

la instancia Call se copia durante la os.fork ing, creando así una segunda instancia separada.A continuación, llama cb.done y que llama a cb._replace que devuelve una tercera Call ejemplo:

def done(self, result=None, error=None): 
    assert not self.finished, 'Call already finished' 
    return self._replace(finished=(-1 if error else 1), 
     result=result, error=error) 

Lo anterior llama al método namedtuple privada _replace. Podría haber sido un sentencias de Python simples como

self.finished = -1 if error else 1 

si Call eran una subclase de object en lugar de una subclase de namedtuple. Subclases namedtuple salvó un poco de escribir en __init__ pero se hace muy torpes más adelante ya que necesitamos para modificar los atributos del namedtuple 's ...

Mientras tanto, el original Call instancia devuelta por delegate(...) en el proceso principal llama attach:

delegate(...).attach(on_sleeper_result) 

Esto modifica el _CALLBACKS dict global. Los procesos de trabajo no tienen forma de conocer este cambio en _CALLBACKS; en los procesos de trabajador _CALLBACKS sigue siendo el dict vacío. Por lo tanto, debe pasar la instancia de trabajador Call al proceso principal a través de mp.Queue, que usa cb.id para hacer referencia a las funciones correctas en _CALLBACKS.

Así funciona todo, pero crea tres Call casos por cada llamada a delegate, y el código podría confundir a los no iniciados en el pensamiento de los tres casos Call son todos el mismo objeto .... todo funciona, pero es algo complicado

¿Ha considerado usar el parámetro mp.Pool.apply_asynccallback en su lugar?

import multiprocessing as mp 
import logging 
import time 
import collections 

_CALLBACKS=collections.defaultdict(list) 

logger=mp.log_to_stderr(logging.DEBUG) 

def attach(name,func): 
    _CALLBACKS[name].append(func) 

def delegate(func, *args, **kwargs): 
    id=kwargs.pop('id') 
    try: 
     result=func(*args,**kwargs) 
    except Exception, err: 
     result=err 
    return (id,result) 

def sleeper(secs): 
    assert secs >= 1, 'I need my Augenpflege' 
    logger.info('sleeper: will go to sleep for %s secs' % secs) 
    time.sleep(secs) 
    logger.info('sleeper: woke up - returning result') 
    return ['sleeper', 'result'] 

def callback(r): 
    id,result=r 
    for func in _CALLBACKS[id]: 
     func(result) 

def on_sleeper_result(r): 
    if isinstance(r, Exception): 
     logger.error('on_sleeper_result: got error: %s' % r) 
    else: 
     logger.info('on_sleeper_result: got result: %s' % r) 

if __name__=='__main__': 
    pool=mp.Pool() 
    pool.apply_async(delegate,args=(sleeper, -3),kwds={'id':1}, 
        callback=callback) 
    attach(1,on_sleeper_result) 
    pool.apply_async(delegate,args=(sleeper, 3),kwds={'id':2}, 
        callback=callback) 
    attach(2,on_sleeper_result)  
    while 1: 
     logger.info('main: loop') 
     time.sleep(1) 
+0

Hola, estaba totalmente al tanto de las múltiples instancias de Llamadas al pasar entre los dos procesos y la llamada _replace, pero tomé esto para la legibilidad, ya que estas acciones delegadas serían relativamente raras dentro de la aplicación. Sí, apply_async fue mi primer intento, pero como mencioné, el controlador de devolución de llamada era no llamado dentro del hilo principal actual. Puede ver el mismo comportamiento también aquí: http://stackoverflow.com/questions/2581817/python-subprocess-callback-when-cmd-exits/5209746#5209746 Pero gracias por la aclaración detallada. – hooblei

Cuestiones relacionadas