2012-02-09 15 views
6

He estado tratando de paralelizar algunos códigos usando concurrent.futures.ProcessPoolExecutor pero he seguido teniendo interbloqueos extraños que no ocurren con ThreadPoolExecutor. Un ejemplo mínima:Interbloqueo en concurrent.futures código

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     executor.submit(test) 

en Python 3.2.2 (en 64 bits de Ubuntu), esto parece que se cuelga constantemente después de la presentación de todos los puestos de trabajo - y esto parece ocurrir cada vez que el número de trabajos presentados es mayor que la Numero de trabajadores. Si reemplazo ProcessPoolExecutor con ThreadPoolExecutor, funciona sin problemas.

Como un intento de investigar, me dio a cada futuro una devolución de llamada para imprimir el valor de i:

from concurrent import futures 

def test(): 
    pass 

with futures.ProcessPoolExecutor(4) as executor: 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test) 

     def callback(f): 
      print('callback {}'.format(i)) 
     future.add_done_callback(callback) 

Esto sólo me confundió aún más - el valor de i impresa por callback es el valor en el la hora en que se llama, en lugar de en el momento en que se definió (por lo que nunca veo callback 0 pero recibo muchas callback 99 s). Nuevamente, ThreadPoolExecutor imprime el valor esperado.

Preguntándome si esto podría ser un error, intenté con una versión de desarrollo reciente de python. Ahora, el código al menos parece terminar, pero todavía obtengo el valor incorrecto de i impreso.

Así puede alguien explicar:

  • lo que pasó a ProcessPoolExecutor entre Python 3.2 y la versión dev corriente que aparentemente fija este punto muerto

  • por qué se está imprimiendo el valor 'equivocado' de i

EDIT: jukiewicz como se señala más adelante, por supuesto, la impresión se imprimirá i el valor en el momento en que se llama la devolución de llamada, no sé en qué estaba pensando ... si paso un objeto invocable con el valor de i como uno de sus atributos, eso funciona como se esperaba.

EDITAR: un poco más de información: todas las devoluciones de llamada se ejecutan, por lo que parece que es executor.shutdown (llamado por executor.__exit__) que no puede decir que los procesos se han completado. Esto parece estar completamente resuelto en el Python 3.3 actual, pero parece haber habido muchos cambios en multiprocessing y concurrent.futures, así que no sé qué solucionó esto. Como no puedo usar 3.3 (no parece ser compatible con las versiones de lanzamiento o dev de numpy), intenté simplemente copiar sus paquetes de multiprocesamiento y simultáneos en mi instalación 3.2, lo que parece funcionar bien. Aún así, parece un poco extraño que, por lo que puedo ver, ProcessPoolExecutor esté completamente roto en la versión de lanzamiento más reciente, pero nadie más se ve afectado.

+1

En cuanto a la segunda, es natural que los procesos impriman '99'. el símbolo 'i' está vinculado por el contexto global, y la creación de nuevos procesos es costosa, por lo que cuando se ejecuta algo,' i == 99'. – julkiewicz

+1

Además, tengo Ubuntu 64 bits, Python 3.2.2 y el primer fragmento de código no cuelga ... – julkiewicz

+0

@julkiewicz: eso es muy extraño. Acabo de probarlo en una máquina diferente ejecutando Scientific Linux y Python 3.2.2 de 64 bits, y se estancó después de imprimir 'submiting 99' en 10 intentos de cada 10. Incluso intenté envolver el código en 'if __name__ == ' __main __'' como he escuchado que es necesario para multiprocesamiento en Windows. – James

Respuesta

2

Modifiqué el código de la siguiente manera, que resolvió ambos problemas. callback función se definió como un cierre, por lo tanto, utilizaría el valor actualizado de i cada vez. En cuanto al punto muerto, es probable que sea una causa de cierre del Ejecutor antes de que se complete toda la tarea. Esperar a que los futuros se completen también resuelve eso.

from concurrent import futures 

def test(i): 
    return i 

def callback(f): 
    print('callback {}'.format(f.result())) 


with futures.ProcessPoolExecutor(4) as executor: 
    fs = [] 
    for i in range(100): 
     print('submitting {}'.format(i)) 
     future = executor.submit(test, i) 
     future.add_done_callback(callback) 
     fs.append(future) 

    for _ in futures.as_completed(fs): pass 

ACTUALIZACIÓN: oh, lo siento, no he leído sus actualizaciones, parece que ya se han resuelto.

Cuestiones relacionadas