2009-11-04 20 views
38

Tengo una matriz de datos muy grande (solo lectura) que quiero que varios procesos procesen en paralelo.¿Cómo combinar Pool.map con Array (memoria compartida) en el multiproceso de Python?

Me gusta la función Pool.map y me gustaría usarla para calcular funciones en esos datos en paralelo.

Vi que se puede usar la clase Value o Array para usar datos de memoria compartida entre procesos. Pero cuando trato de utilizar este recibo una RuntimeError: 'objetos SynchronizedString sólo deben ser compartidos entre los procesos a través de la herencia cuando se utiliza la función Pool.map:

Aquí es un ejemplo simplificado de lo que yo estoy tratando de hacer:

from sys import stdin 
from multiprocessing import Pool, Array 

def count_it(arr, key): 
    count = 0 
    for c in arr: 
    if c == key: 
     count += 1 
    return count 

if __name__ == '__main__': 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    # want to share it using shared memory 
    toShare = Array('c', testData) 

    # this works 
    print count_it(toShare, "a") 

    pool = Pool() 

    # RuntimeError here 
    print pool.map(count_it, [(toShare,key) for key in ["a", "b", "s", "d"]]) 

¿Alguien puede decirme qué estoy haciendo mal aquí?

Lo que me gustaría hacer es pasar información sobre una matriz asignada de memoria compartida recientemente creada a los procesos después de que se hayan creado en el grupo de procesos.

+1

Desafortunadamente eso no es posible. La forma recomendada según la documentación de mp es usar inheritence (en plataformas fork). Para los datos de solo lectura que tiene aquí, uno normalmente usaría un global, pero puede usar una matriz compartida para la comunicación de lectura/escritura. La bifurcación es barata, por lo que puede recrear el Pool cada vez que recibe los datos, luego ciérrelo. Desafortunadamente, en Windows esto no es posible: la solución consiste en utilizar una matriz de memoria compartida (incluso en el caso de solo lectura) pero esto solo se puede pasar a los subprocesos en la creación del proceso (imagino que deben agregarse a la lista de acceso) ... – robince

+0

para el segmento de memoria compartida y que esta lógica no se implementa excepto en el inicio del subproceso). Puede pasar la matriz de datos compartidos en el inicio del grupo como lo mostré, o a un proceso de manera similar. No puede pasar una matriz de memoria compartida a un grupo abierto; debe crear el grupo después de la memoria. Maneras fáciles de solucionar esto incluyen asignar un búfer de tamaño máximo, o simplemente asignar el arreglo cuando se conoce el tamaño requerido antes de iniciar el Pool. Si mantiene bajas sus variables globales, el pool tampoco debería ser demasiado caro en Windows: las variables globales son automáticamente ... – robince

+0

escabeche y enviadas a los subprocesos, por lo que sugiero que haga un buffer de tamaño suficiente al inicio (donde, con suerte, su cantidad de variables globales es pequeña), entonces Pool, es mejor. Me tomé el tiempo para entender y resolver su problema de buena fe, antes de editar su pregunta, así que aunque comprendo que si desea dejarlo funcionar, espero que al final considere aceptar mi respuesta si nada sustancialmente diferente/mejor llega. a lo largo. – robince

Respuesta

35

Tratando de nuevo como acabo de ver la generosidad;)

Básicamente creo que el mensaje de error significa lo que dijo - multiprocesamiento compartida matrices de memoria no puede pasado como argumentos (por decapado). No tiene sentido serializar los datos; el punto es que los datos son memoria compartida. Entonces tienes que hacer que la matriz compartida sea global. Creo que es mejor ponerlo como el atributo de un módulo, como en mi primera respuesta, pero simplemente dejarlo como una variable global en tu ejemplo también funciona bien. Tomando en cuenta su punto de no querer establecer los datos antes del tenedor, aquí hay un ejemplo modificado. Si quisiera tener más de una posible matriz compartida (y es por eso que quería pasar aShare como argumento) podría hacer una lista global de matrices compartidas, y simplemente pasar el índice a count_it (que se convertiría en for c in toShare[i]:).

from sys import stdin 
from multiprocessing import Pool, Array, Process 

def count_it(key): 
    count = 0 
    for c in toShare: 
    if c == key: 
     count += 1 
    return count 

if __name__ == '__main__': 
    # allocate shared array - want lock=False in this case since we 
    # aren't writing to it and want to allow multiple processes to access 
    # at the same time - I think with lock=True there would be little or 
    # no speedup 
    maxLength = 50 
    toShare = Array('c', maxLength, lock=False) 

    # fork 
    pool = Pool() 

    # can set data after fork 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    if len(testData) > maxLength: 
     raise ValueError, "Shared array too small to hold data" 
    toShare[:len(testData)] = testData 

    print pool.map(count_it, ["a", "b", "s", "d"]) 

[EDIT: Lo anterior no funciona en las ventanas, porque de no usar tenedor. Sin embargo, el siguiente no funciona en Windows, sigue utilizando la piscina, así que creo que esto es lo más cercano a lo que quiere:

from sys import stdin 
from multiprocessing import Pool, Array, Process 
import mymodule 

def count_it(key): 
    count = 0 
    for c in mymodule.toShare: 
    if c == key: 
     count += 1 
    return count 

def initProcess(share): 
    mymodule.toShare = share 

if __name__ == '__main__': 
    # allocate shared array - want lock=False in this case since we 
    # aren't writing to it and want to allow multiple processes to access 
    # at the same time - I think with lock=True there would be little or 
    # no speedup 
    maxLength = 50 
    toShare = Array('c', maxLength, lock=False) 

    # fork 
    pool = Pool(initializer=initProcess,initargs=(toShare,)) 

    # can set data after fork 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    if len(testData) > maxLength: 
     raise ValueError, "Shared array too small to hold data" 
    toShare[:len(testData)] = testData 

    print pool.map(count_it, ["a", "b", "s", "d"]) 

No sé por qué mapa no se Conserve en vinagre la matriz, pero Proceso y piscina habrá - Creo quizás se haya transferido en el punto de la inicialización del subproceso en Windows. Tenga en cuenta que los datos aún están configurados después de la horquilla.

+0

Incluso en plataformas con fork no se pueden insertar nuevos datos compartidos en toShare después de la bifurcación ya que cada proceso tendrá su propia copia independiente en ese punto. –

+0

Así que el verdadero problema parece ser que la forma en que podemos conservar en vinagre la información sobre una matriz para que se pueda enviar y conectado a del otro proceso. –

+0

@James: no, eso no está bien. La matriz debe configurarse antes de la bifurcación, pero luego se puede modificar la memoria compartida, con cambios visibles en todos los elementos secundarios. Mire el ejemplo: coloco los datos en la matriz * después de * la horquilla (que ocurre cuando se crea una instancia de Pool()). Esa información podría obtenerse en tiempo de ejecución, después de la bifurcación, y siempre que encaje en el segmento de memoria compartida preasignado, puede copiarse ahí y verse desde todos los elementos secundarios. – robince

2

Si los datos es de sólo lectura sólo lo hacen una variable en un módulo antes el tenedor de la piscina. Entonces, todos los procesos secundarios deberían poder acceder a él, y no se copiará siempre que no se escriba en él.

import myglobals # anything (empty .py file) 
myglobals.data = [] 

def count_it(key): 
    count = 0 
    for c in myglobals.data: 
     if c == key: 
      count += 1 
    return count 

if __name__ == '__main__': 
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 

pool = Pool() 
print pool.map(count_it, ["a", "b", "s", "d"]) 

Si quiere tratar de usar una matriz aunque se puede tratar con el argumento de palabra clave lock=False (bien es cierto por defecto).

+0

No creo que el uso de globales sea seguro y ciertamente no funcionaría en las ventanas donde los procesos no están bifurcados. –

+0

¿Cómo no es seguro? Si solo necesita acceso de lectura a los datos, está bien. Si escribe por error, la página modificada se copiará-en-escribir para el proceso hijo para que no ocurra nada malo (por ejemplo, no interferiría con otros procesos). Tiene razón, aunque no funcionará en Windows ... – robince

+0

Tiene razón en que es seguro en plataformas basadas en horquillas. Pero me gustaría saber si existe una forma compartida de memoria compartida para compartir grandes cantidades de datos después de que se crea el grupo de procesos. –

4

El problema que veo es que Pool no admite el decapado de datos compartidos a través de su lista de argumentos. Eso es lo que significa el mensaje de error por "los objetos solo deben compartirse entre procesos a través de la herencia". Los datos compartidos deben heredarse, es decir, ser globales si desea compartirlos utilizando la clase Pool.

Si necesita pasarlos explícitamente, puede que tenga que usar multiprocesamiento.Proceso. Aquí está su ejemplo reelaborado:

from multiprocessing import Process, Array, Queue 

def count_it(q, arr, key): 
    count = 0 
    for c in arr: 
    if c == key: 
     count += 1 
    q.put((key, count)) 

if __name__ == '__main__': 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    # want to share it using shared memory 
    toShare = Array('c', testData) 

    q = Queue() 
    keys = ['a', 'b', 's', 'd'] 
    workers = [Process(target=count_it, args = (q, toShare, key)) 
    for key in keys] 

    for p in workers: 
    p.start() 
    for p in workers: 
    p.join() 
    while not q.empty(): 
    print q.get(), 

Output: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

El orden de los elementos de la cola puede variar.

Para que esto sea más genérico y similar a la piscina, se podría crear un número fijo N de Procesos, dividir la lista de claves en N trozos, y luego usar una función de contenedor como el objetivo del proceso, la cual llamará count_it para cada clave en la lista que se pasa, como:

def wrapper(q, arr, keys): 
    for k in keys: 
    count_it(q, arr, k) 
-1

The multiprocessing.sharedctypes module provides functions for allocating ctypes objects from shared memory which can be inherited by child processes.

Entonces su uso de sharedctypes es incorrecto. ¿Desea heredar esta matriz del proceso principal o prefiere pasarla explícitamente? En el primer caso, debe crear una variable global como sugieren otras respuestas. Pero no necesita usar sharedctypes para pasarlo explícitamente, solo pase el original testData.

Por cierto, su uso de Pool.map() es incorrecto. Tiene la misma interfaz que la función incorporada map() (¿la confundiste con starmap()?). A continuación está el ejemplo de trabajo con, pasando matriz explícitamente:

from multiprocessing import Pool 

def count_it((arr, key)): 
    count = 0 
    for c in arr: 
     if c == key: 
      count += 1 
    return count 

if __name__ == '__main__': 
    testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" 
    pool = Pool() 
    print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]]) 
+0

Eso no es lo que quiere porque en teoría TestData será muy grande - y este método se traduce en ser en escabeche (que requiere más memoria) y se copia a cada proceso (que requiere al menos n x almacenamiento original). – robince

+0

@thrope: tienes razón, es por eso que mencioné ambas formas posibles. El ejemplo para usar la variable global debería ser obvio, por lo que no es necesario enumerarlo. –

+1

@Denis - Sí, pero por desgracia, el método global no funciona en Windows - que depende de tenedor y UNIX copia en escritura. Si utiliza el método global en Windows, el multiprocesamiento recogerá los datos y los enviará a cada subproceso secundario, lo que requerirá mucha más memoria. – robince

Cuestiones relacionadas