2009-01-21 17 views
42

Tengo una lista/cola de 200 comandos que necesito ejecutar en un shell en un servidor Linux.Procesamiento paralelo desde una cola de comandos en Linux (bash, python, ruby ​​... lo que sea)

Solo quiero tener un máximo de 10 procesos en ejecución (desde la cola) a la vez. Algunos procesos tardarán unos segundos en completarse, otros procesos tomarán mucho más tiempo.

Cuando finaliza un proceso, quiero que el siguiente comando se "extraiga" de la cola y se ejecute.

¿Alguien tiene código para resolver este problema?

La elaboración adicional:

Hay 200 piezas de trabajo que hay que hacer, en una cola de algún tipo. Quiero tener al menos 10 trabajos a la vez. Cuando un hilo termina una pieza de trabajo, debe pedirle a la cola la siguiente pieza de trabajo. Si no hay más trabajo en la cola, el hilo debería morir. Cuando todos los hilos han muerto, significa que todo el trabajo se ha realizado.

El problema real que intento resolver es usar imapsync para sincronizar 200 buzones de correo de un servidor de correo antiguo a uno nuevo. Algunos usuarios tienen buzones de correo grandes y tardan mucho tiempo en sincronizarse, otros tienen buzones de correo muy pequeños y se sincronizan rápidamente.

Respuesta

38

Me imagino que podría hacer esto con make y make -j xx.

Tal vez un makefile como esto

all : usera userb userc.... 

usera: 
     imapsync usera 
userb: 
     imapsync userb 
.... 

make -j 10 -f makefile

+1

Esto funcionó exactamente como esperaba. Escribí un código para generar el Makefile. Terminó siendo más de 1000 líneas. ¡Gracias! – mlambie

+1

+1 ¡Esto es muy inteligente! – progo

+1

Encontré que si alguno de los comandos sale con un código de error, make saldrá, impidiendo la ejecución de trabajos futuros. En algunas situaciones, esta solución es menos que ideal. ¿Alguna recomendación para este escenario? –

-3

¿Puede explicar lo que quiere decir con en paralelo? Parece que necesita implementar algún tipo de bloqueo en la cola para que sus entradas no se seleccionen dos veces, etc. y los comandos se ejecuten solo una vez.

La mayoría de los sistemas de cola hacen trampa: solo escriben una lista de tareas pendientes gigante, luego seleccionan, p. diez elementos, trabaje con ellos y seleccione los siguientes diez elementos. No hay paralelización

Si proporciona más detalles, estoy seguro de que podemos ayudarlo.

7

GNU make (y tal vez otras implementaciones también) tiene el argumento -j, que determina cuántos trabajos se ejecutarán a la vez. Cuando un trabajo finaliza, make iniciará otro.

4

Bueno, si son en gran medida independientes entre sí, pensaría en términos de:

Initialize an array of jobs pending (queue, ...) - 200 entries 
Initialize an array of jobs running - empty 

while (jobs still pending and queue of jobs running still has space) 
    take a job off the pending queue 
    launch it in background 
    if (queue of jobs running is full) 
     wait for a job to finish 
     remove from jobs running queue 
while (queue of jobs is not empty) 
    wait for job to finish 
    remove from jobs running queue 

Tenga en cuenta que la prueba de la cola en el bucle principal significa que si los 'trabajos ejecución de la cola' dispone de espacio cuando el ciclo while itera, lo que evita la terminación prematura del ciclo. Creo que la lógica es sólida.

Puedo ver cómo hacerlo en C con bastante facilidad, tampoco sería tan difícil en Perl (y por lo tanto no demasiado difícil en los otros lenguajes de scripting: Python, Ruby, Tcl, etc.). No estoy del todo seguro de que quisiera hacerlo en shell: el comando wait en el shell espera a que todos los hijos finalicen, en lugar de que un hijo termine.

+0

puede usar el comando de espera para un proceso hijo específico también. Se le puede dar cualquier número de argumentos, cada uno de los cuales puede ser un pid o id de trabajo. –

+1

@BrianMinton: tienes razón en que puedes listar PIDs específicos con 'wait', pero todavía obtienes el comportamiento de 'todos muertos', no 'primero muerto', que es lo que realmente necesita este código. –

42

En el shell, xargs se puede utilizar para poner en cola el procesamiento de comandos en paralelo.Por ejemplo, para tener siempre 3 Capacidad en paralelo, dormir durante 1 segundo cada una, y la ejecución de 10 plazas en total hacer

echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _ 

Y sería dormir durante 4 segundos en total. Si usted tiene una lista de nombres, y quiere pasar a los nombres de los comandos ejecutados, de nuevo la ejecución de 3 comandos en paralelo, hace

cat names | xargs -n1 -P3 process_name 

ejecutaría el comando process_name alice, process_name bob y así sucesivamente.

+0

¡Guau, uso 'xargs' todo el tiempo y nunca esperé que tuviera esta opción! – Warrick

+0

Para el segundo ejemplo que das, ¿cómo modificas el comando para que 'process_name' pueda tomar más de un argumento? Quiero hacer algo como esto: 'cat commands.txt | xargs -n1 -P3 eval' donde 'commands.txt' tiene un conjunto de comandos (uno en cada línea, cada uno con múltiples argumentos). El problema es que 'eval' no funciona, ya que es un comando de shell incorporado – Eddy

3

en Python, puede probar:

import Queue, os, threading 

# synchronised queue 
queue = Queue.Queue(0) # 0 means no maximum size 

# do stuff to initialise queue with strings 
# representing os commands 
queue.put('sleep 10') 
queue.put('echo Sleeping..') 
# etc 
# or use python to generate commands, e.g. 
# for username in ['joe', 'bob', 'fred']: 
# queue.put('imapsync %s' % username) 

def go(): 
    while True: 
    try: 
     # False here means no blocking: raise exception if queue empty 
     command = queue.get(False) 
     # Run command. python also has subprocess module which is more 
     # featureful but I am not very familiar with it. 
     # os.system is easy :-) 
     os.system(command) 
    except Queue.Empty: 
     return 

for i in range(10): # change this to run more/fewer threads 
    threading.Thread(target=go).start() 

No comprobado ...

(por supuesto, Python en sí tiene un único subproceso. Debería obtener el beneficio de múltiples hilos en términos de wa iting para IO, sin embargo)

+0

1) El sistema os.s puede ser reemplazado por el nuevo módulo mejorado de subprocesos. 2) No importa que CPython tenga un GIL porque está ejecutando comandos externos, no el código de Python (funciones). –

+0

Si reemplaza 'threading.Thread' por' multiprocessing.Process' y 'Queue' por' multiprocessing.Queue', entonces el código se ejecutará usando múltiples procesos. – jfs

+0

pssh está escrito en python Creo que – rogerdpack

1

multiprocessing module de Python parece que encaja bien con su problema. Es un paquete de alto nivel que admite el enhebrado por proceso.

13

Para este tipo de trabajo PPSS está escrito: secuencia de comandos de shell de procesamiento paralelo. Busque este nombre en Google y lo encontrará, no lo haré con linkspam.

+0

jaja, genial. saluda de litb: P –

+0

Esto parece ideal. Lo comprobaré la próxima vez que me enfrente con un problema similar. Gracias por actualizar este hilo con tu recomendación. – mlambie

+0

¡increíble! podemos configurarlo enhebrado dinámico que, p. 80% de CPU/Ram permiten? – Devrim

24

Parallel es hecho exatcly para este propósito.

cat userlist | parallel imapsync 

Una de las ventajas de Parallel en comparación con otras soluciones es que hace que no se mezcla Seguro salida. Haciendo traceroute en Parallel funciona bien, por ejemplo:

(echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute 
+1

Hombre Me encanta esta herramienta. Lo he sabido por 3 horas y lo voy a usar hasta que las piedras de la tierra me griten que me detenga. – chiggsy

+1

Fedora 16 incluyó la herramienta en el repositorio de paquetes – myroslav

0

Función simple en zsh para paralelizar trabajos en no más de 4 subcapas, usando los archivos de bloqueo en/tmp.

La única parte no trivial son las banderas glob en la primera prueba:

  • #q: permitir que el nombre de archivo comodines en una prueba
  • [4]: devuelve el cuarto resultado sólo
  • N: ignorar el error de resultado vacío

Debería ser fácil convertirlo a posix, aunque sería un poco más detallado.

No olvides escapar cualquier cotización en los trabajos con \".

#!/bin/zsh 

setopt extendedglob 

para() { 
    lock=/tmp/para_$$_$((paracnt++)) 
    # sleep as long as the 4th lock file exists 
    until [[ -z /tmp/para_$$_*(#q[4]N) ]] { sleep 0.1 } 
    # Launch the job in a subshell 
    (touch $lock ; eval $* ; rm $lock) & 
    # Wait for subshell start and lock creation 
    until [[ -f $lock ]] { sleep 0.001 } 
} 

para "print A0; sleep 1; print Z0" 
para "print A1; sleep 2; print Z1" 
para "print A2; sleep 3; print Z2" 
para "print A3; sleep 4; print Z3" 
para "print A4; sleep 3; print Z4" 
para "print A5; sleep 2; print Z5" 

# wait for all subshells to terminate 
wait 
Cuestiones relacionadas