2012-03-21 5 views
10

Entiendo que usar subprocess es la forma preferida de llamar a un comando externo.Controlar el número de subprocesos usando para llamar comandos externos en python

Pero, ¿y si quiero ejecutar varios comandos en parall, pero limitar el número de procesos que se generan? Lo que me molesta es que no puedo bloquear los subprocesos. Por ejemplo, si me llaman

subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile) 

A continuación, el proceso continuará, sin esperar a que cmd a fin. Por lo tanto, no puedo concluir en un trabajador de la biblioteca multiprocessing.

Por ejemplo, si hago:

def worker(cmd): 
    subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile); 

pool = Pool(processes = 10); 
results =[pool.apply_async(worker, [cmd]) for cmd in cmd_list]; 
ans = [res.get() for res in results]; 

entonces cada trabajador va a terminar y regresar después del desove un subproceso. Así que no puedo limitar el número de procesos generados por subprocess usando Pool.

¿Cuál es la forma correcta de limitar el número de subprocesos?

Respuesta

6

Puede usar subprocess.call si desea esperar a que se complete el comando. Vea pydoc subprocess para más información.

También puede llamar al método Popen.wait en su trabajador:

def worker(cmd): 
    p = subprocess.Popen(cmd, stderr=outputfile, stdout=outputfile); 
    p.wait() 
+0

Esto desactiva el procesamiento paralelo completamente – qed

+2

No debería. La pregunta es usar el módulo 'multiprocesamiento', y cada trabajador se genera en un proceso separado, por lo que' esperar() 'en un trabajador no impedirá que otros trabajadores se ejecuten. Dicho esto, esto no es correcto en sí mismo: este ejemplo no 'devuelve' nada al trabajador, por lo que al llamar '.get()' a los resultados no se devolverá nada. – larsks

7

No necesita múltiples procesos de Python o incluso hilos para limitar el número máximo de subprocesos paralelos:

from itertools import izip_longest 
from subprocess import Popen, STDOUT 

groups = [(Popen(cmd, stdout=outputfile, stderr=STDOUT) 
      for cmd in commands)] * limit # itertools' grouper recipe 
for processes in izip_longest(*groups): # run len(processes) == limit at a time 
    for p in filter(None, processes): 
     p.wait() 

Ver Iterate an iterator by chunks (of n) in Python?

Si desea limitar tanto el número máximo como el mínimo de subprocesos paralelos, podría usar un grupo de subprocesos:

from multiprocessing.pool import ThreadPool 
from subprocess import STDOUT, call 

def run(cmd): 
    return cmd, call(cmd, stdout=outputfile, stderr=STDOUT) 

for cmd, rc in ThreadPool(limit).imap_unordered(run, commands): 
    if rc != 0: 
     print('{cmd} failed with exit status: {rc}'.format(**vars())) 

tan pronto como termine cualquiera de limit subprocesos, un nuevo sub-proceso se pone en marcha para mantener limit número de subprocesos en todo momento.

O usando ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor # pip install futures 
from subprocess import STDOUT, call 

with ThreadPoolExecutor(max_workers=limit) as executor: 
    for cmd in commands: 
     executor.submit(call, cmd, stdout=outputfile, stderr=STDOUT) 

He aquí una sencilla aplicación grupo de subprocesos:

import subprocess 
from threading import Thread 

try: from queue import Queue 
except ImportError: 
    from Queue import Queue # Python 2.x 


def worker(queue): 
    for cmd in iter(queue.get, None): 
     subprocess.check_call(cmd, stdout=outputfile, stderr=subprocess.STDOUT) 

q = Queue() 
threads = [Thread(target=worker, args=(q,)) for _ in range(limit)] 
for t in threads: # start workers 
    t.daemon = True 
    t.start() 

for cmd in commands: # feed commands to threads 
    q.put_nowait(cmd) 

for _ in threads: q.put(None) # signal no more commands 
for t in threads: t.join() # wait for completion 

Para evitar la salida prematura, agregue el manejo de excepciones.

Si desea capturar la salida del subproceso en una cadena, consulte Python: execute cat subprocess in parallel.

Cuestiones relacionadas