Me gustaría saber cómo se hace el multiprocesamiento correctamente. Suponiendo que tengo una lista [1,2,3,4,5]
generada por la función f1
que está escrita en un Queue
(círculo verde a la izquierda). Ahora comienzo dos procesos extrayendo de esa cola (ejecutando f2
en los procesos). Procesan los datos, digamos: doblando el valor, y lo escriben en la segunda fila. Ahora, la función f3
lee estos datos y los imprime.Multiprocesamiento en una tubería hecho bien
Dentro de las funciones que hay una especie de un bucle, tratando de leer desde la cola para siempre. ¿Cómo detengo este proceso?
Idea 1
f1
no sólo se envía la lista, sino también un objeto o un objeto None
custon, class PipelineTerminator: pass
o algo por el que se acaba siendo propagado hasta el fondo. f3
ahora espera a que llegue None
, cuando está allí, se sale del circuito. Problema: es posible que uno de los dos f2
s lea y propague el None
mientras que el otro sigue procesando un número. Entonces el último valor se pierde.
Idea 2
f3
es f1
. De modo que la función f1
genera los datos y las tuberías, genera los procesos con f2
y alimenta todos los datos. Después de desove y alimentación, escucha en la segunda tubería, simplemente contando y procesando los objetos recibidos. Como sabe la cantidad de datos alimentados, puede finalizar los procesos que se ejecutan en f2
. Pero si el objetivo es establecer una canalización de procesamiento, los diferentes pasos deben ser separables. Entonces f1
, f2
y f3
son elementos diferentes de una tubería, y los costosos pasos se realizan en paralelo.
Idea 3
Cada pieza de la tubería es una función, esta función genera procesos, ya que le gusta y es responsable de gestionar ellos. Sabe cuántos datos entraron y cuántos datos se han devuelto (quizás con yield
). Por lo tanto, es seguro propagar un objeto None
.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(En lugar de utilizar hilos, tal vez se puede pensar en una solución más inteligente.)
Entonces ...
he implementado una solución siguiente idea 2, la alimentación y la espera de los resultados llegaron, pero no era realmente una tubería con funciones independientes conectadas entre sí. Funcionó para la tarea que tenía que administrar, pero era difícil de mantener.
Me gustaría saber cómo se implementan las tuberías (¿es fácil en un proceso con las funciones del generador y demás, pero con múltiples procesos?) Y las gestiona habitualmente.
Pero, ¿cómo deberían los trabajadores en 'f2' * saber * que es el último? 'f1' necesita saber cuántos trabajadores hay y enviar ese número de objetos personalizados. Hecho así, se garantiza que cada trabajador reciba esta notificación. Eso es claramente posible, pero luego no puedo "simplemente enchufar las funciones", necesito saber cuántos trabajadores hay en cada paso. Es por eso que me gusta la idea 3. Y gracias por las cosas 'concurrentes', eso es nuevo para mí y lo investigaré. –
Por eso también marqué "aceptar" :) –
Como el objeto personalizado "dejar de trabajar" se envía con "F1", puede incluir el número total de procesos de trabajo "f2". Si estos simplemente pasan el objeto "parar de trabajar" a "f3", se llega a conocer la cantidad total de trabajadores. Se podría enviar más información de esta manera, así que una cosa importante es tener una "capa de control" al menos en "f3" (pero posiblemente también en "f1") que simplemente se preocupará por esto y simplemente transmitirá cualquier mensaje que no sea " objetos en la cola para ser realmente procesados. – jsbueno