Esta es una extensión de mi pregunta reciente Avoiding race conditions in Python 3's multiprocessing Queues. Esperemos que esta versión de la pregunta sea más específica.Multiprocesamiento eficiente de la maximización masiva de fuerza bruta en Python 3
TL; DR: En un modelo de multiprocesamiento donde los procesos de trabajo se alimentan desde una cola utilizando multiprocessing.Queue
, ¿por qué están mis procesos de trabajo tan inactivos? Cada proceso tiene su propia cola de entrada, por lo que no están luchando entre sí por el bloqueo de una cola compartida, pero las colas pasan mucho tiempo vacías. El proceso principal está ejecutando un subproceso vinculado a E/S: ¿eso está ralentizando el llenado de CPU de las colas de entrada?
Estoy tratando de encontrar el elemento máximo del producto cartesiano de N conjuntos cada uno con elementos M_i (para 0 < = i < N) bajo una cierta restricción. Recuerde que los elementos del producto cartesiano son tuplas de longitud-N cuyos elementos son elementos de los N conjuntos. Llamaré a estas combinaciones de tuplas para enfatizar el hecho de que estoy repitiendo cada combinación de los conjuntos originales. Una combinación cumple con la restricción cuando mi función is_feasible
devuelve True
. En mi problema, estoy tratando de encontrar la combinación cuyos elementos tienen el mayor peso: sum(element.weight for element in combination)
.
El tamaño de mi problema es grande, pero también lo es el servidor de mi empresa. Estoy tratando de reescribir el siguiente algoritmo de serie como un algoritmo paralelo.
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
Mi enfoque multiprocesamiento actual es crear procesos de trabajo y darles de comer combinaciones con una cola de entrada. Cuando los trabajadores reciben un poison pill, colocan la mejor combinación que han visto en una cola de salida y salen. Lleno la cola de entrada desde el hilo principal del proceso principal. Una ventaja de esta técnica es que puedo generar un hilo secundario del proceso principal para ejecutar una herramienta de monitoreo (solo un REPL que puedo usar para ver cuántas combinaciones se han procesado hasta ahora y qué tan completas están las colas).
+-----------+
in_q0 | worker0 |----\
/-------+-----------+ \
+-----------+ in_q1 +-----------+ \ out_q +-----------+
| main |-----------| worker1 |-----------| main |
+-----------+ +-----------+/ +-----------+
\-------+-----------+ /
in_q2 | worker2 |----/
+-----------+
Originalmente tuve todos los trabajadores leyendo de una cola de entrada pero encontré que ninguno de ellos estaba golpeando la CPU. Calculando que estaban pasando todo su tiempo esperando que queue.get() para desbloquear, les di sus propias colas. Eso aumentó la presión en la CPU, así que pensé que los trabajadores estaban activos más a menudo. Sin embargo, ¡las colas pasan la mayor parte del tiempo vacías! (Lo sé por el monitoreo REPL que mencioné). Esto me sugiere que el ciclo principal que llena las colas es lento. Aquí es que bucle:
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
que supongo es el cuello de botella worker.in_q.put()
. ¿Cómo puedo hacer eso más rápido? Mi primer instinto fue hacer a los trabajadores más lentos, pero eso simplemente no tiene sentido ... ¿Es el problema de que el hilo del monitor está deteniendo el ciclo con demasiada frecuencia? ¿Cómo podría decirlo?
Alternativamente, ¿hay otra forma de implementar esto que no implique tanta espera en los bloqueos?
I _think_ Veo lo que dices y, si es así, eso requeriría mucha reescritura. Definitivamente tienes un punto acerca de mi constante '' volcado '' y '' cargando '' los elementos (que son instancias de una subclase de 'collections.Counter'). ¿Ayudaría si los prendiera? Por ejemplo, antes del bucle 'for' en' main', tiene una línea como 'sets_of_pickles = map (lambda s: map (dumps, s), sets)' – wkschwartz
Eso probablemente ayudaría un poco: ahorraría en los costos de decapado , pero aún estarías transmitiendo las cadenas escaneadas a través de IPC una y otra vez en lugar de solo una vez por trabajador. – Dougal