2010-09-30 11 views
23

Estoy usando python 2.7 e intento ejecutar algunas tareas pesadas de la CPU en sus propios procesos. Me gustaría poder enviar mensajes al proceso principal para mantenerlo informado del estado actual del proceso. La cola de multiprocesamiento parece perfecta para esto, pero no puedo encontrar la manera de hacerlo funcionar.¿Puedo usar una Cola de multiprocesamiento en una función llamada por Pool.imap?

Por lo tanto, este es mi ejemplo de trabajo básico menos el uso de una cola.

import multiprocessing as mp 
import time 

def f(x): 
    return x*x 

def main(): 
    pool = mp.Pool() 
    results = pool.imap_unordered(f, range(1, 6)) 
    time.sleep(1) 

    print str(results.next()) 

    pool.close() 
    pool.join() 

if __name__ == '__main__': 
    main() 

He intentado pasar la cola de varias maneras, y reciben el mensaje de error "RuntimeError: Los objetos de cola sólo deben ser compartidos entre los procesos a través de la herencia". Esta es una de las formas en que lo intenté en base a una respuesta anterior que encontré. (Me da el mismo problema al intentar utilizar Pool.map_async y Pool.imap)

import multiprocessing as mp 
import time 

def f(args): 
    x = args[0] 
    q = args[1] 
    q.put(str(x)) 
    time.sleep(0.1) 
    return x*x 

def main(): 
    q = mp.Queue() 
    pool = mp.Pool() 
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6))) 

    print str(q.get()) 

    pool.close() 
    pool.join() 

if __name__ == '__main__': 
    main() 

Por último, el enfoque de la aptitud 0 (que sea global) no genera ningún mensaje, sólo encierra.

import multiprocessing as mp 
import time 

q = mp.Queue() 

def f(x): 
    q.put(str(x)) 
    return x*x 

def main(): 
    pool = mp.Pool() 
    results = pool.imap_unordered(f, range(1, 6)) 
    time.sleep(1) 

    print q.get() 

    pool.close() 
    pool.join() 

if __name__ == '__main__': 
    main() 

Soy consciente de que es probable que trabajar con multiprocessing.Process directamente y que hay otras bibliotecas de lograr esto, pero no me gusta a alejarse de las funciones de la biblioteca estándar que son un gran ajuste hasta que me Estoy seguro de que no es solo mi falta de conocimiento lo que me impide ser capaz de explotarlos.

Gracias.

+0

¿Ha considerado usar jar: http://luispedro.org/software/jug? – luispedro

Respuesta

41

El truco es pasar la cola como un argumento para el inicializador. Parece que funciona con todos los métodos de despacho de Pool.

import multiprocessing as mp 

def f(x): 
    f.q.put('Doing: ' + str(x)) 
    return x*x 

def f_init(q): 
    f.q = q 

def main(): 
    jobs = range(1,6) 

    q = mp.Queue() 
    p = mp.Pool(None, f_init, [q]) 
    results = p.imap(f, jobs) 
    p.close() 

    for i in range(len(jobs)): 
     print q.get() 
     print results.next() 

if __name__ == '__main__': 
    main() 
+3

¡Muy buena demostración del propósito y la utilidad de los argumentos 'initializer' y' initargs' en 'multiprocessing.Pool'! –

+0

¿Podría explicar por qué funciona? ¿Qué se agradece cuando haces f.q = q? – kepkin

+2

@kepkin En Python, cada función es un objeto (consulte http://docs.python.org/reference/datamodel.html#the-standard-type-hierarchy Tipos que se pueden llamar). Por lo tanto, f.q está configurando un atributo llamado q en el objeto de función f. Era solo una forma rápida y ligera de guardar el objeto Queue para usarlo más tarde. – Olson

Cuestiones relacionadas