2008-12-05 10 views
7

Estoy experimentando con el nuevo módulo de multiprocesamiento en Python 2.6. Estoy creando varios procesos, cada uno con su propio multiprocesador. Instancia de QueinableCuenta. Cada proceso genera uno o más subprocesos de trabajo (subclases de threading.Thread) que comparten la instancia de JoinableQueue (transferida a través del método __init__ de cada Thread). Parece que funciona en general, pero de vez en cuando y de manera impredecible se produce el siguiente error:Multiproceso de Python 2.6. ¿Compatible con hilos?

File "C:\Documents and Settings\Brian\Desktop\testscript.py", line 49, in run 
    self.queue.task_done() 
    File "C:\Python26\lib\multiprocessing\queues.py", line 293, in task_done 
    raise ValueError('task_done() called too many times') 
ValueError: task_done() called too many times 

Mi cola get() y task_done() las llamadas son justo después de unos a otros por lo que deben ser iguales. Anecdóticamente esto parece ocurrir solo cuando el trabajo realizado entre get() y task_done() es MUY rápido. Insertar un pequeño time.sleep(0.01) parece aliviar el problema.

¿Alguna idea de lo que está pasando? ¿Puedo usar una cola multiprocesador con hilos en lugar de la cola más tradicional (Queue.Queue)?

Gracias!

-brian

+0

Un extracto de su código que involucre objetos Queue podría ayudar. – jfs

Respuesta

2

Usted debe pasar a la cola de objetos como argumentos de destino.

Ejemplo de multiprocessing's documentation:

from multiprocessing import Process, Queue 

def f(q): 
    q.put([42, None, 'hello']) 

if __name__ == '__main__': 
    q = Queue() 
    p = Process(target=f, args=(q,)) 
    p.start() 
    print q.get() # prints "[42, None, 'hello']" 
    p.join() 

Queues are thread and process safe.

-1

Gracias por la rápida respuesta. Paso las instancias de multiprocesamiento.Queue como argumentos a cada proceso como lo ilustra. La falla parece ocurrir en los hilos. Los estoy creando al crear subclases de subprocesos. Al leer y pasar la cola al método 'init' de cada instancia de subprocesos. Esta parece ser la forma aceptada de pasar en colas para enhebrar subclases. Lo único que pensé es que las colas de multiprocesamiento pueden no ser compatibles con los subprocesos (aunque supuestamente son seguros para subprocesos).

4

No experimenté con multiprocesamiento en 2.6 todavía, pero jugué mucho con pyprocessing (como se llamaba en 2.5).

Veo que está buscando una cantidad de procesos con cada uno engendrando un conjunto de hilos, respectivamente.

dado que está utilizando el módulo de multiprocesamiento, voy a sugerir el uso proceso de múltiples y no enfoque multi hilo, que llegará a menos problemas como bloqueos, etc.

Crear un objeto de cola. http://pyprocessing.berlios.de/doc/queue-objects.html

Para crear un entorno de proceso múltiple use un grupo: http://pyprocessing.berlios.de/doc/pool-objects.html que le administrará los procesos de trabajo. A continuación, puede aplicar asincrónico/sincrónico a los trabajadores y también puede agregar una devolución de llamada para cada trabajador si es necesario. Pero recuerde devolver la llamada es un bloque de código común y debe regresar de inmediato (como se menciona en la documentación)

Alguna información adicional: Si es necesario crear un gestor de http://pyprocessing.berlios.de/doc/manager-objects.html para gestionar el acceso al objeto de cola. Deberá hacer que el objeto de la cola se comparta para esto. Pero la ventaja es que, una vez que se comparte y administra, puede acceder a esta cola compartida en toda la red creando objetos proxy. Esto le permitirá llamar a métodos de un objeto de cola compartido centralizado como (aparentemente) métodos nativos en cualquier nodo de red.

aquí es un ejemplo de código de la documentación

Es posible ejecutar un servidor del gestor en una máquina y tener clientes lo utilizan de otras máquinas (suponiendo que los servidores de seguridad involucrados lo permiten). ejecutando los siguientes comandos crea un servidor para una cola compartida que los clientes remotos pueden utilizar:

>>> from processing.managers import BaseManager, CreatorMethod 
>>> import Queue 
>>> queue = Queue.Queue() 
>>> class QueueManager(BaseManager): 
...  get_proxy = CreatorMethod(callable=lambda:queue, typeid='get_proxy') 
... 
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='none') 
>>> m.serve_forever() 

un cliente puede acceder al servidor de la siguiente manera:

>>> from processing.managers import BaseManager, CreatorMethod 
>>> class QueueManager(BaseManager): 
...  get_proxy = CreatorMethod(typeid='get_proxy') 
... 
>>> m = QueueManager.from_address(address=('foo.bar.org', 50000), authkey='none') 
>>> queue = m.get_proxy() 
>>> queue.put('hello') 

Si insisten en seguros roscado cosas, PEP371 (multiprocesamiento) hace referencia a esto http://code.google.com/p/python-safethread/