2012-08-16 15 views
18

Me gustaría ejecutar varias instancias de program.py simultáneamente, mientras que limita el número de instancias que se ejecutan al mismo tiempo (por ejemplo, a la cantidad de núcleos de CPU disponibles en mi sistema) Por ejemplo, si tengo 10 núcleos y tengo que hacer 1000 ejecuciones de program.py en total, solo se crearán y ejecutarán 10 instancias en cualquier momento dado.Multiprocesamiento en Python al tiempo que limita el número de procesos en ejecución

He intentado usar el módulo de multiprocesamiento, multiprocesamiento y uso de colas, pero no hay nada que me parezca que se presta para una implementación fácil. El mayor problema que tengo es encontrar la forma de limitar el número de procesos que se ejecutan simultáneamente. Esto es importante porque si creo 1000 procesos a la vez, se vuelve equivalente a una bomba de tenedor. No necesito los resultados devueltos por los procesos de forma programática (salen al disco), y todos los procesos se ejecutan independientemente el uno del otro.

¿Puede alguien darme sugerencias o un ejemplo de cómo podría implementar esto en python, o incluso bash? Publicaba el código que he escrito hasta el momento usando colas, pero no funciona como debería y puede que ya esté en el camino equivocado.

Muchas gracias.

+2

¿Has probado [grupos de procesos de Python] (http://docs.python.org/library/multiprocessing.html#module-multiprocessing.pool)? – C2H5OH

+0

La forma más sencilla de hacerlo es crear un programa "controlador" que cree el 'multiprocesamiento.pool' y engendre los hilos del trabajador (program.py), reasignando el trabajo cuando finalicen las instancias. – jozzas

+0

Gracias, intentaré esto; en mi primer intento, por alguna razón, llegué a la conclusión de que el multiprocesamiento no era lo que quería, pero ahora parece correcto. Entonces, en este caso, los hilos de trabajo simplemente generarían program.py (como un hilo? Con subproceso.Popen)? ¿Podría publicar un ejemplo aproximado o una implementación de plantilla que pueda seguir? – steadfast

Respuesta

2

script de Bash en lugar de Python, pero lo uso a menudo por simples procesamiento en paralelo:

#!/usr/bin/env bash 
waitForNProcs() 
{ 
nprocs=$(pgrep -f $procName | wc -l) 
while [ $nprocs -gt $MAXPROCS ]; do 
    sleep $SLEEPTIME 
    nprocs=$(pgrep -f $procName | wc -l) 
done 
} 
SLEEPTIME=3 
MAXPROCS=10 
procName=myPython.py 
for file in ./data/*.txt; do 
waitForNProcs 
./$procName $file & 
done 

o para casos muy simples, otra opción es xargs donde P establece el número de procs

find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 
3

Debe usar un supervisor de proceso. Un enfoque sería usar la API proporcionada por Circus para hacer eso "programáticamente", el sitio de documentación ahora está fuera de línea, pero creo que es solo un problema temporal, de todos modos, puede usar el Circus para manejar esto. Otro enfoque sería usar el supervisord y establecer el parámetro numprocs del proceso en la cantidad de núcleos que tenga.

Un ejemplo usando el Circo:

from circus import get_arbiter 

arbiter = get_arbiter("myprogram", numprocesses=3) 
try: 
    arbiter.start() 
finally: 
    arbiter.stop() 
21

Yo sé que usted ha mencionado que el enfoque Pool.map no tiene mucho sentido para usted. El mapa es solo una manera fácil de darle una fuente de trabajo, y un llamamiento para aplicar a cada uno de los artículos. El func para el mapa podría ser cualquier punto de entrada para hacer el trabajo real en el arg dado.

Si eso no parece adecuado para usted, tengo una respuesta bastante detallada aquí sobre el uso de un modelo productor-consumidor: https://stackoverflow.com/a/11196615/496445

En esencia, se crea una cola, y empezar N número de trabajadores. Luego, alimenta la cola desde el hilo principal o crea un proceso Productor que alimenta la cola. Los trabajadores simplemente continúan retirando el trabajo de la cola y nunca habrá más trabajo concurrente que el número de procesos que haya iniciado.

También tiene la opción de poner un límite en la cola, de modo que bloquea al productor cuando ya hay demasiado trabajo pendiente, si necesita poner restricciones también en la velocidad y los recursos que consume el productor.

La función de trabajo a la que se llama puede hacer lo que desee. Esto puede ser una envoltura alrededor de algún comando del sistema, o puede importar su lib de Python y ejecutar la rutina principal. Existen sistemas de administración de procesos específicos que le permiten configurar configuraciones para ejecutar sus ejecutables arbitrarios con recursos limitados, pero esto es solo un enfoque básico de Python para hacerlo.

Fragmentos de que other answer mío:

Piscina básica:

from multiprocessing import Pool 

def do_work(val): 
    # could instantiate some other library class, 
    # call out to the file system, 
    # or do something simple right here. 
    return "FOO: %s" % val 

pool = Pool(4) 
work = get_work_args() 
results = pool.map(do_work, work) 

El uso de un gestor de procesos y productor

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 

     # exit signal 
     if item == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = item 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    # this could also be started in a producer process 
    # instead of blocking 
    iters = itertools.chain(get_work_args(), (None,)*num_workers) 
    for item in iters: 
     work.put(item) 

    for p in pool: 
     p.join() 

    print results 
+0

Muy buen ejemplo, lo mejoré al obtener el número de CPUS como se explica en http://stackoverflow.com/questions/6905264/python-multiprocessing-utilizes-only-one-core y así poder establecer dinámicamente num_workers en función de las CPU de la máquina. –

0

Si bien hay muchas respuestas sobre el uso de multiprocesamiento .pool, no hay muchos fragmentos de código en h Cómo usar multiprocesamiento. Procesar, que de hecho es más beneficioso cuando importa el uso de la memoria. iniciar 1000 procesos sobrecargará la CPU y matará la memoria. Si cada proceso y sus canalizaciones de datos consumen mucha memoria, el sistema operativo o Python limitará el número de procesos paralelos. Desarrollé el siguiente código para limitar el número simultáneo de trabajos enviados a la CPU en lotes. El tamaño del lote se puede escalar proporcionalmente al número de núcleos de la CPU. En mi PC con Windows, la cantidad de trabajos por lote puede ser eficiente hasta 4 veces la cantidad de CPU disponible.

import multiprocessing 
def func_to_be_multiprocessed(q,data): 
    q.put(('s')) 
q = multiprocessing.Queue() 
worker = [] 
for p in range(number_of_jobs): 
    worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ 
     args=(q,data)...)) 
num_cores = multiprocessing.cpu_count() 
Scaling_factor_batch_jobs = 3.0 
num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs 
num_of_batches = number_of_jobs // num_jobs_per_batch 
for i_batch in range(num_of_batches): 
    floor_job = i_batch * num_jobs_per_batch 
    ceil_job = floor_job + num_jobs_per_batch 
    for p in worker[floor_job : ceil_job]: 
             worker.start() 
    for p in worker[floor_job : ceil_job]: 
             worker.join() 
for p in worker[ceil_job :]: 
          worker.start() 
for p in worker[ceil_job :]: 
          worker.join() 
for p in multiprocessing.active_children(): 
          p.terminate() 
result = [] 
for p in worker: 
    result.append(q.get()) 

El único problema es que si cualquiera de los puesto de trabajo en cualquier lote que no pudo completar y conduce a una situación que cuelga, no se iniciará resto de los lotes del trabajo. Por lo tanto, la función que se procesará debe tener rutinas de manejo de errores adecuadas.

Cuestiones relacionadas