2011-11-26 13 views
10

Me gustaría saber cómo se hace el multiprocesamiento correctamente. Suponiendo que tengo una lista [1,2,3,4,5] generada por la función f1 que está escrita en un Queue (círculo verde a la izquierda). Ahora comienzo dos procesos extrayendo de esa cola (ejecutando f2 en los procesos). Procesan los datos, digamos: doblando el valor, y lo escriben en la segunda fila. Ahora, la función f3 lee estos datos y los imprime.Multiprocesamiento en una tubería hecho bien

layout of the data flow

Dentro de las funciones que hay una especie de un bucle, tratando de leer desde la cola para siempre. ¿Cómo detengo este proceso?

Idea 1

f1 no sólo se envía la lista, sino también un objeto o un objeto None custon, class PipelineTerminator: pass o algo por el que se acaba siendo propagado hasta el fondo. f3 ahora espera a que llegue None, cuando está allí, se sale del circuito. Problema: es posible que uno de los dos f2 s lea y propague el None mientras que el otro sigue procesando un número. Entonces el último valor se pierde.

Idea 2

f3 es f1. De modo que la función f1 genera los datos y las tuberías, genera los procesos con f2 y alimenta todos los datos. Después de desove y alimentación, escucha en la segunda tubería, simplemente contando y procesando los objetos recibidos. Como sabe la cantidad de datos alimentados, puede finalizar los procesos que se ejecutan en f2. Pero si el objetivo es establecer una canalización de procesamiento, los diferentes pasos deben ser separables. Entonces f1, f2 y f3 son elementos diferentes de una tubería, y los costosos pasos se realizan en paralelo.

Idea 3

pipeline idea 3

Cada pieza de la tubería es una función, esta función genera procesos, ya que le gusta y es responsable de gestionar ellos. Sabe cuántos datos entraron y cuántos datos se han devuelto (quizás con yield). Por lo tanto, es seguro propagar un objeto None.

setup child processes 

execute thread one and two and wait until both finished 

thread 1: 
    while True: 
     pull from input queue 
     if None: break and set finished_flag 
     else: push to queue1 and increment counter1 

thread 2: 
    while True: 
     pull from queue2 
     increment counter2 
     yield result 
     if counter1 == counter2 and finished_flag: break 

when both threads finished: kill process pool and return. 

(En lugar de utilizar hilos, tal vez se puede pensar en una solución más inteligente.)

Entonces ...

he implementado una solución siguiente idea 2, la alimentación y la espera de los resultados llegaron, pero no era realmente una tubería con funciones independientes conectadas entre sí. Funcionó para la tarea que tenía que administrar, pero era difícil de mantener.

Me gustaría saber cómo se implementan las tuberías (¿es fácil en un proceso con las funciones del generador y demás, pero con múltiples procesos?) Y las gestiona habitualmente.

Respuesta

1

¿Cuál sería el problema con el uso de la idea 1, pero con cada proceso de trabajo (f2) poner un objeto personalizado con su identificador cuando se hace?Entonces f3, simplemente terminaría a ese trabajador, hasta que no quedara ningún proceso de trabajo.

Además, como novedad en Python 3.2 es el paquete concurrent.futures en la biblioteca estándar, que debe hacer lo que usted está tratando de la manera "correcta" (tm) - http://docs.python.org/dev/library/concurrent.futures.html

tal vez es posible encuentre un respaldo de concurrent.futures a la serie Python 2.x.

+0

Pero, ¿cómo deberían los trabajadores en 'f2' * saber * que es el último? 'f1' necesita saber cuántos trabajadores hay y enviar ese número de objetos personalizados. Hecho así, se garantiza que cada trabajador reciba esta notificación. Eso es claramente posible, pero luego no puedo "simplemente enchufar las funciones", necesito saber cuántos trabajadores hay en cada paso. Es por eso que me gusta la idea 3. Y gracias por las cosas 'concurrentes', eso es nuevo para mí y lo investigaré. –

+0

Por eso también marqué "aceptar" :) –

+0

Como el objeto personalizado "dejar de trabajar" se envía con "F1", puede incluir el número total de procesos de trabajo "f2". Si estos simplemente pasan el objeto "parar de trabajar" a "f3", se llega a conocer la cantidad total de trabajadores. Se podría enviar más información de esta manera, así que una cosa importante es tener una "capa de control" al menos en "f3" (pero posiblemente también en "f1") que simplemente se preocupará por esto y simplemente transmitirá cualquier mensaje que no sea " objetos en la cola para ser realmente procesados. – jsbueno

1

Por Idea 1, ¿qué tal:

import multiprocessing as mp 

sentinel=None 

def f2(inq,outq): 
    while True: 
     val=inq.get() 
     if val is sentinel: 
      break 
     outq.put(val*2) 

def f3(outq): 
    while True: 
     val=outq.get() 
     if val is sentinel: 
      break 
     print(val) 

def f1(): 
    num_workers=2 
    inq=mp.Queue() 
    outq=mp.Queue() 
    for i in range(5): 
     inq.put(i) 
    for i in range(num_workers):   
     inq.put(sentinel) 
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)] 
    printer=mp.Process(target=f3,args=(outq,)) 
    for w in workers: 
     w.start() 
    printer.start() 
    for w in workers: 
     w.join() 
    outq.put(sentinel) 
    printer.join() 

if __name__=='__main__': 
    f1() 

La única diferencia con la descripción de Idea 1 es que f2 escapa de la while-loop cuando recibe el centinela (terminando así mismo). f1 bloques hasta que los trabajadores terminen (usando w.join()) y luego envía f3 el centinela (lo que indica que se salió de su while-loop).

+0

Gracias, eso es similar al enfoque que terminé implementando, pero su versión es muy legible. Lo que no me gusta es el hecho de que cada componente de la tubería necesita saber algo sobre la tubería, como en este caso: 'impresora' necesita saber el número de trabajadores en el paso anterior y así sucesivamente. Es por eso que pensé en encapsular esto y dar * cada * paso en la tubería * exactamente una * entrada y una salida y la ramificación y la fusión tiene lugar en cada paso. –

+0

Ese es un buen punto. Puede hacer que 'f3' sea independiente de' num_workers', pero dejar que 'f1' envíe el centinela después de que' workers' terminen. He editado la publicación para mostrar lo que quiero decir. – unutbu

7

Con MPipe módulo, simplemente hacer esto:

from mpipe import OrderedStage, Pipeline 

def f1(value): 
    return value * 2 

def f2(value): 
    print(value) 

s1 = OrderedStage(f1, size=2) 
s2 = OrderedStage(f2) 
p = Pipeline(s1.link(s2)) 

for task in 1, 2, 3, 4, 5, None: 
    p.put(task) 

Las carreras anteriores 4 procesos:

  • dos para la primera etapa (función f1)
  • uno para la segunda etapa (functi en f2)
  • y uno más para el programa principal que alimenta la tubería.

El MPipe cookbook ofrece una explicación de cómo se cierran los procesos internamente utilizando None como última tarea.

Para ejecutar el código, instale MPipe:

virtualenv venv 
venv/bin/pip install mpipe 
venv/bin/python prog.py 

Salida:

2 
4 
6 
8 
10 
+0

¡Se ve bien, al menos en el ejemplo introductorio! Bonito logo, por cierto. –

0

La manera más fácil de hacer exactamente eso es el uso de semáforos.

F1

F1 está poblando su 'cola' con los datos que desea procesar. Al final de este empujón, pones n palabras clave 'Detener' en tu cola. n = 2 para su ejemplo, pero generalmente la cantidad de trabajadores involucrados. Código se vería así:

for n in no_of_processes: 
    tasks.put('Stop') 

F2

F2 está tirando de la cola proporcionada por un -command get. El elemento se toma de la cola y se elimina en la cola.Ahora, usted puede poner el pop en un bucle, prestando atención a la señal de parada:

for elem in iter(tasks.get, 'STOP'): 
    do something 

F3

Ésta es un poco complicado. Podrías generar un semáforo en F2 que actúa como una señal para F3. Pero no sabes cuándo llega esta señal y puedes perder datos. Sin embargo, F3 extrae los datos de la misma manera que F2 y puede ponerlos en una declaración try... except. queue.get levanta un queue.Empty cuando no hay elementos en la cola. Por lo que su tirón en la F3 se vería así:

while control: 
    try: 
     results.get() 
    except queue.Empty: 
     control = False 

Con tasks y results siendo colas. Entonces no necesitas nada que no esté incluido en Python.

0

Yo uso concurent.futures y tres piscinas, que están conectadas entre sí a través de future.add_done_callback. Luego espero a que termine todo el proceso llamando al shutdown en cada grupo.

from concurrent.futures import ProcessPoolExecutor 
import time 
import random 


def worker1(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe12(future): 
    pool2.submit(worker2, future.result()).add_done_callback(pipe23) 


def worker2(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe23(future): 
    pool3.submit(worker3, future.result()).add_done_callback(spout) 


def worker3(arg): 
    time.sleep(random.random()) 
    return arg 


def spout(future): 
    print(future.result()) 


if __name__ == "__main__": 
    __spec__ = None # Fix multiprocessing in Spyder's IPython 
    pool1 = ProcessPoolExecutor(2) 
    pool2 = ProcessPoolExecutor(2) 
    pool3 = ProcessPoolExecutor(2) 
    for i in range(10): 
     pool1.submit(worker1, i).add_done_callback(pipe12) 
    pool1.shutdown() 
    pool2.shutdown() 
    pool3.shutdown()