2012-05-19 6 views
8

Esta es una extensión de mi pregunta reciente Avoiding race conditions in Python 3's multiprocessing Queues. Esperemos que esta versión de la pregunta sea más específica.Multiprocesamiento eficiente de la maximización masiva de fuerza bruta en Python 3

TL; DR: En un modelo de multiprocesamiento donde los procesos de trabajo se alimentan desde una cola utilizando multiprocessing.Queue, ¿por qué están mis procesos de trabajo tan inactivos? Cada proceso tiene su propia cola de entrada, por lo que no están luchando entre sí por el bloqueo de una cola compartida, pero las colas pasan mucho tiempo vacías. El proceso principal está ejecutando un subproceso vinculado a E/S: ¿eso está ralentizando el llenado de CPU de las colas de entrada?

Estoy tratando de encontrar el elemento máximo del producto cartesiano de N conjuntos cada uno con elementos M_i (para 0 < = i < N) bajo una cierta restricción. Recuerde que los elementos del producto cartesiano son tuplas de longitud-N cuyos elementos son elementos de los N conjuntos. Llamaré a estas combinaciones de tuplas para enfatizar el hecho de que estoy repitiendo cada combinación de los conjuntos originales. Una combinación cumple con la restricción cuando mi función is_feasible devuelve True. En mi problema, estoy tratando de encontrar la combinación cuyos elementos tienen el mayor peso: sum(element.weight for element in combination).

El tamaño de mi problema es grande, pero también lo es el servidor de mi empresa. Estoy tratando de reescribir el siguiente algoritmo de serie como un algoritmo paralelo.

from operator import itemgetter 
from itertools import product # Cartesian product function from the std lib 
def optimize(sets): 
    """Return the largest (total-weight, combination) tuple from all 
    possible combinations of the elements in the several sets, subject 
    to the constraint that is_feasible(combo) returns True.""" 
    return max(
       map(
        lambda combination: (
         sum(element.weight for element in combination), 
         combination 
        ), 
        filter(
         is_feasible, # Returns True if combo meets constraint 
         product(*sets) 
        ) 
       ), 
       key=itemgetter(0) # Only maximize based on sum of weight 
      ) 

Mi enfoque multiprocesamiento actual es crear procesos de trabajo y darles de comer combinaciones con una cola de entrada. Cuando los trabajadores reciben un poison pill, colocan la mejor combinación que han visto en una cola de salida y salen. Lleno la cola de entrada desde el hilo principal del proceso principal. Una ventaja de esta técnica es que puedo generar un hilo secundario del proceso principal para ejecutar una herramienta de monitoreo (solo un REPL que puedo usar para ver cuántas combinaciones se han procesado hasta ahora y qué tan completas están las colas).

    +-----------+ 
      in_q0 | worker0 |----\ 
      /-------+-----------+  \ 
+-----------+ in_q1 +-----------+ \ out_q +-----------+ 
| main |-----------| worker1 |-----------| main | 
+-----------+   +-----------+/  +-----------+ 
      \-------+-----------+ /
      in_q2 | worker2 |----/ 
        +-----------+ 

Originalmente tuve todos los trabajadores leyendo de una cola de entrada pero encontré que ninguno de ellos estaba golpeando la CPU. Calculando que estaban pasando todo su tiempo esperando que queue.get() para desbloquear, les di sus propias colas. Eso aumentó la presión en la CPU, así que pensé que los trabajadores estaban activos más a menudo. Sin embargo, ¡las colas pasan la mayor parte del tiempo vacías! (Lo sé por el monitoreo REPL que mencioné). Esto me sugiere que el ciclo principal que llena las colas es lento. Aquí es que bucle:

from itertools import cycle 
main(): 
    # (Create workers, each with its own input queue) 
    # Cycle through each worker's queue and add a combination to that queue 
    for combo, worker in zip(product(*sets), cycle(workers)): 
     worker.in_q.put(combo) 
    # (Collect results and return) 

que supongo es el cuello de botella worker.in_q.put(). ¿Cómo puedo hacer eso más rápido? Mi primer instinto fue hacer a los trabajadores más lentos, pero eso simplemente no tiene sentido ... ¿Es el problema de que el hilo del monitor está deteniendo el ciclo con demasiada frecuencia? ¿Cómo podría decirlo?

Alternativamente, ¿hay otra forma de implementar esto que no implique tanta espera en los bloqueos?

Respuesta

4

¿Cómo son sus elementos? Podría ser que conservarlos para ponerlos en la cola es lento, lo que obviamente sería un cuello de botella. Tenga en cuenta que cada elemento se está encurtiendo independientemente una y otra vez.

Si este es el caso, este enfoque podría ayudar:

  • elegir un conjunto de cardinalidad> = el número de trabajadores. Idealmente, sería mucho más que la cantidad de trabajadores.Llame a este conjunto A y asigne subconjuntos aproximadamente iguales de A a cada trabajador. Transmita ese subconjunto a cada trabajador.
  • Distribuya el contenido completo de todos los juegos distintos de A a cada uno de los trabajadores (probablemente a través de pickle.dumps una vez y luego transmita la misma cadena a cada trabajador, o posiblemente a través de memoria compartida o lo que sea).
  • Luego cada trabajador tiene la información completa que necesita para hacer su subconjunto. Puede comenzar de manera feliz por product(my_A_subset, *other_sets) (posiblemente ordenado de manera diferente), sondeando para algún tipo de señal de parada entre cada trabajo (o cada tres trabajos o lo que sea). Esto no necesita ser a través de una cola, un valor de memoria compartida de un bit funciona bien.
+0

I _think_ Veo lo que dices y, si es así, eso requeriría mucha reescritura. Definitivamente tienes un punto acerca de mi constante '' volcado '' y '' cargando '' los elementos (que son instancias de una subclase de 'collections.Counter'). ¿Ayudaría si los prendiera? Por ejemplo, antes del bucle 'for' en' main', tiene una línea como 'sets_of_pickles = map (lambda s: map (dumps, s), sets)' – wkschwartz

+1

Eso probablemente ayudaría un poco: ahorraría en los costos de decapado , pero aún estarías transmitiendo las cadenas escaneadas a través de IPC una y otra vez en lugar de solo una vez por trabajador. – Dougal

Cuestiones relacionadas