2012-04-17 17 views
21

¿Hay una manera de asignar a cada trabajador en una piscina pitón multiprocesamiento un identificador único de manera que un trabajo está dirigido por un trabajador en particular en la piscina podría saber qué trabajador está ejecutando ¿eso? De acuerdo con la documentación, un Process tiene un name peroObtener un identificador único para el trabajador en la piscina de multiprocesamiento pitón

El nombre es una cadena que se utiliza para propósitos de identificación. No tiene semántica . Múltiples procesos pueden recibir el mismo nombre.

Para mi caso de uso particular, quiero ejecutar un montón de trabajos en un grupo de cuatro GPU, y necesito configurar el número de dispositivo para la GPU en la que se debe ejecutar el trabajo. Debido a que los trabajos no tienen una longitud uniforme, quiero asegurarme de no tener una colisión en una GPU de un trabajo que intente ejecutarlo antes de que termine el anterior (por lo que esto imposibilita la asignación previa de una ID a la unidad de trabajo antes de tiempo).

+1

¿Por qué no usar algo al azar, como UUID? –

+0

@LuperRouch - ¿Podrías ampliar lo que quieres decir con eso? – JoshAdel

+1

Por ejemplo '' process = Process (target = foo, name = uuid.uuid4(). Hex) '' daría nombres únicos a sus procesos. –

Respuesta

38

Parece que lo que quiere es simple: multiprocessing.current_process(). Por ejemplo:

import multiprocessing 

def f(x): 
    print multiprocessing.current_process() 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

de salida:

$ python foo.py 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-3, started daemon)> 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-4, started daemon)> 
[0, 1, 4, 9, 16, 25] 

Esto devuelve el propio objeto de proceso, por lo que el proceso puede ser su propia identidad. También puede llamar id en él durante un identificador numérico único - en CPython, esta es la dirección de memoria del objeto de proceso, por lo que no hace pensar hay alguna posibilidad de solapamiento. Finalmente, puede usar la propiedad ident o pid del proceso, pero eso solo se establece una vez que se inicia el proceso.

Además, al revisar la fuente, me parece muy probable que los nombres autogenerados (como se ejemplifique con el primer valor en las cadenas de representación Process arriba) sean únicos. multiprocessing mantiene un objeto itertools.counter para cada proceso, que se utiliza para generar una tupla _identity para cualquier proceso hijo que genere. Por lo tanto, el proceso de nivel superior produce un proceso hijo con identificadores de valor único y generan un proceso con identificadores de dos valores, y así sucesivamente. Luego, si no se pasa ningún nombre al constructor Process, simplemente autogenerates the name basado en _identity, usando ':'.join(...). Luego Poolalters the name del proceso usando replace, dejando el id autogenerado igual.

El resultado de todo esto es que, aunque dos Process es pueden tener el mismo nombre, ya que puede asignar el mismo nombre a ellas cuando se crea, ellos son únicos, si no se toca el nombre parámetro. Además, teóricamente puedes usar _identity como identificador único; ¡pero entiendo que hicieron esa variable privada por una razón!

Un ejemplo de lo anterior en acción:

import multiprocessing 

def f(x): 
    created = multiprocessing.Process() 
    current = multiprocessing.current_process() 
    print 'running:', current.name, current._identity 
    print 'created:', created.name, created._identity 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

Salida:

$ python foo.py 
running: PoolWorker-1 (1,) 
created: Process-1:1 (1, 1) 
running: PoolWorker-2 (2,) 
created: Process-2:1 (2, 1) 
running: PoolWorker-3 (3,) 
created: Process-3:1 (3, 1) 
running: PoolWorker-1 (1,) 
created: Process-1:2 (1, 2) 
running: PoolWorker-2 (2,) 
created: Process-2:2 (2, 2) 
running: PoolWorker-4 (4,) 
created: Process-4:1 (4, 1) 
[0, 1, 4, 9, 16, 25] 
1

Usted puede utilizar multiprocessing.Queue para almacenar los identificadores y luego obtener la identificación en la inicialización del proceso de piscina.

Ventajas:

  • Usted no tiene que depender de elementos internos.
  • Si su caso de uso es administrar recursos/dispositivos, puede ingresar el número de dispositivo directamente. Esto también asegurará que ningún dispositivo se use dos veces: si tiene más procesos en su grupo que dispositivos, los procesos adicionales se bloquearán en queue.get() y no realizará ningún trabajo (Esto no bloqueará su porgrama, o al menos no lo hizo). cuando probé).

Desventajas:

  • Usted tienen sobrecarga de comunicación adicional y el desove de la piscina procesos de toma un poquito más largo: Sin la sleep(1) en el ejemplo todo el trabajo puede ser realizado por el primer proceso, como los demás aún no se han inicializado.
  • Se necesita un mundial (o al menos yo no sé una manera alrededor de ella)

Ejemplo:

import multiprocessing 
from time import sleep 

def init(queue): 
    global idx 
    idx = queue.get() 

def f(x): 
    global idx 
    process = multiprocessing.current_process() 
    sleep(1) 
    return (idx, process.pid, x * x) 

ids = [0, 1, 2, 3] 
manager = multiprocessing.Manager() 
idQueue = manager.Queue() 

for i in ids: 
    idQueue.put(i) 

p = multiprocessing.Pool(8, init, (idQueue,)) 
print(p.map(f, range(8))) 

Salida:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)] 

Tenga en cuenta, que hay son solo 4 pid diferentes, aunque el grupo contiene 8 procesos y un idx solo lo usa un proceso.

0

Hice esto con el enhebrado y terminé usando a queue para manejar la gestión de trabajos. Aquí está la línea de base. Mi versión completa tiene un montón de try-catches (especialmente en el trabajador, para asegurarse de que se llama al q.task_done() incluso en caso de error).

from threading import Thread 
from queue import Queue 
import time 
import random 


def run(idx, *args): 
    time.sleep(random.random() * 1) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     while True: 
      args = q.get() 
      run(idx, *args) 
      q.task_done() 

    for job in jobs: 
     q.put(job) 

    for i in range(0, workers): 
     t = Thread(target=worker, args=[i]) 
     t.daemon = True 
     t.start() 

    q.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

no tenía necesidad de usar multiprocesamiento (mis trabajadores son sólo para llamar a un proceso externo), pero esto podría extenderse. La API para multiprocesamiento cambia un toque, aquí es cómo se puede adaptar:

from multiprocessing import Process, Queue 
from Queue import Empty 
import time 
import random 

def run(idx, *args): 
    time.sleep(random.random() * i) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     try: 
      while True: 
       args = q.get(timeout=1) 
       run(idx, *args) 
     except Empty: 
      return 

    for job in jobs: 
     q.put(job) 

    processes = [] 
    for i in range(0, workers): 
     p = Process(target=worker, args=[i]) 
     p.daemon = True 
     p.start() 
     processes.append(p) 

    for p in processes: 
     p.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

Ambas versiones de salida algo como:

0 : ('job', 0) 
1 : ('job', 2) 
1 : ('job', 6) 
3 : ('job', 3) 
0 : ('job', 5) 
1 : ('job', 7) 
2 : ('job', 1) 
4 : ('job', 4) 
3 : ('job', 8) 
0 : ('job', 9) 
Cuestiones relacionadas