2010-11-27 9 views
14

Realicé algunas búsquedas en línea, encontré simples 'tutoriales' para usar canalizaciones con nombre. Sin embargo, cuando hago algo con trabajos en segundo plano, parece que pierdo muchos datos.Usar canalizaciones con nombre con bash - Problema con pérdida de datos

[[Editar: encontró una solución mucho más simple, ver respuesta a la publicación. Entonces la pregunta que presento ahora es académica - en caso de que uno quiera un servidor de trabajo]]

Usando Ubuntu 10.04 con Linux 2.6.32-25-genérico # 45-Ubuntu SMP Sáb 16 Oct 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash, versión 4.1.5 (1) - liberación (x86_64-pc-linux-gnu).

Mi función bash es:

function jqs 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read txt <"$pipe" 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     fi 
    fi 
    done 
} 

corro esto en el fondo:

> jqs& 
[1] 5336 

Y ahora que lo alimentan:

for i in 1 2 3 4 5 6 7 8 
do 
    (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &) 
done 

La salida es inconsistente. Con frecuencia no obtengo todos los ecos de éxito. Obtengo como máximo tantos ecos de texto nuevos como ecos de éxito, a veces menos.

Si elimino el '&' del 'feed', parece que funciona, pero estoy bloqueado hasta que se lea la salida. De ahí que quiera dejar que los subprocesos se bloqueen, pero no el proceso principal.

El objetivo es escribir una secuencia de comandos de control de trabajo simple para poder ejecutar un máximo de 10 trabajos en paralelo y poner el resto en cola para su posterior procesamiento, pero sé que se ejecutan.

gestor de trabajos completa a continuación:

function jq_manage 
{ 
    export __gn__="$1" 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 
    trap "rm -f $pipe" EXIT 
    trap "break"  SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    date 
    jobs 
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__)) 
    then 
     echo "Waiting for new job" 
     if read new_job <"$pipe" 
     then 
    echo "new job is [[$new_job]]" 

    if [[ "$new_job" == 'quit' ]] 
    then 
     break 
    fi 

    echo "In group $__gn__, starting job $new_job" 
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &" 
     fi 
    else 
     sleep 3 
    fi 
    done 
} 

function jq 
{ 
    # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs) 
    # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently 

    export __gn__="$1" 
    shift 
    export __jN__="$1" 
    shift 

    export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l) 
    if (($__jq__ '<' 1)) 
    then 
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &" 
    fi 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 

    echo [email protected] >$pipe 
} 

Calling

jq <name> <max processes> <command> 
jq abc 2 sleep 20 

se iniciará un proceso. Esa parte funciona bien. Comienza un segundo, bien. Uno por uno a mano parece funcionar bien. Pero comenzar 10 en un bucle parece perder el sistema, como en el ejemplo más simple anterior.

Cualquier sugerencia sobre lo que puedo hacer para resolver esta aparente pérdida de datos IPC sería muy apreciada.

Saludos, Alain.

+0

Eche un vistazo en [segundo 2018 Edit to * Cómo configurar una variable a la salida de un comando *] (https://stackoverflow.com/a/41236640/1765658) o en [GitHub.com: Connector- bash] (https://github.com/F-Hauri/Connector-bash). Donde conecto * subprocess hoding * tools * a mi sesión de shell actual. –

Respuesta

26

Su problema es if comunicado a continuación:

while true 
do 
    if read txt <"$pipe" 
    .... 
done 

Lo que sucede es que el servidor de cola de trabajos está abriendo y cerrando el tubo cada vez alrededor del bucle. Esto significa que algunos de los clientes reciben un error de "tubería rota" cuando intentan escribir en la tubería, es decir, el lector de la tubería se va después de que el escritor lo abre.

Para solucionar este problema, cambiar su bucle en el servidor de abrir la tubería una vez para todo el bucle:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 

hace de esta manera, el tubo se abre una vez y se mantiene abierta.

Deberá tener cuidado con lo que ejecuta dentro del bucle, ya que todo el procesamiento dentro del bucle se habrá adjuntado al conducto designado. Deberá asegurarse de redireccionar el código fuente de todos sus procesos dentro del ciclo desde otro lugar, de lo contrario, pueden consumir los datos del conducto.

Editar: Con el problema ahora de que obtiene EOF en sus lecturas cuando el último cliente cierra la tubería, puede usar el método jilles para duplicar los descriptores de archivos, o simplemente puede asegurarse de que usted también sea un cliente mantener el lado de escritura de la tubería abierta:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 3> "$pipe" 

esto mantendrá el lado de escritura del tubo abierto en fd 3. la misma advertencia se aplica con este descriptor de fichero al igual que con la entrada estándar. Deberá cerrarlo para que los procesos secundarios no lo hereden. Probablemente importe menos que con stdin, pero sería más limpio.

+0

Guau, excelente respuesta. Tiene sentido. Gracias. Lo intentaré de inmediato. – asoundmove

+0

Ok, ahora que resolvió el problema clave, tengo otro: ¿cómo se obtiene la lectura para esperar la entrada? Voy a publicar una respuesta a mí mismo más adelante con código de muestra. – asoundmove

+1

@asoundmove: He actualizado la respuesta con la solución a EOF en lectura. – camh

0

Por un lado el problema es peor de lo que pensaba: Ahora parece haber un caso en mi ejemplo más complejo (jq_manage) donde los mismos datos se leen una y otra vez del conducto (aunque no se están escribiendo nuevos datos).

Por otro lado, he encontrado una solución simple (editado siguiente comentario de Dennis):

function jqn # compute the number of jobs running in that group 
{ 
    __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l) 
} 

function jq 
{ 
    __groupn__="$1"; shift # job group name (the pool within which to allocate $__jmax__ jobs) 
    __jmax__="$1"; shift # maximum of job numbers to run concurrently 

    jqn 
    while (($__jqty__ '>=' $__jmax__)) 
    do 
    sleep 1 
    jqn 
    done 

    eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; [email protected]) &" 
} 

funciona como un encanto. No hay toma ni tubería involucrada. Simple.

+1

No hay razón para exportar '__jqty__' (o cualquiera de las exportaciones en el original). ¿Por qué haces eco de algo directamente en '/ dev/null'? ¿Por qué usar 'eval'? ¿Por qué no simplemente hacer '$ @ &'? No es necesario citar '> ='. Estoy de acuerdo con la respuesta de Camh. –

+0

Todo se reduce a leer y filtrar la salida de ps. echo a/dev/null porque en realidad no quiero la salida, solo quiero las cadenas correctas en la salida de 'ps'. Lo mismo ocurre con eval, de lo contrario, ps muestra los nombres de variables, no las variables expandidas, eval hace la expansión. Nunca utilicé ((...)) antes, así que gracias por señalar que no necesito cotizaciones, estaba saliendo de un ejemplo que leí en algún lado y gracias también por la exportación, es un sobrante del anterior scripts más complicados que tenían subprocesos y requerían la exportación. – asoundmove

+0

Perdón, quise decir "trabajos", no "ps" – asoundmove

1

Como camh & Dennis Williamson dice no rompa la tubería.

Ahora tengo ejemplos más pequeños y directos en la línea de comandos:

Servidor:

(
    for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9}; 
    do 
    if read s; 
     then echo ">>$i--$s//"; 
    else 
     echo "<<$i"; 
    fi; 
    done < tst-fifo 
)& 

Cliente:

(
    for i in {%a,#b}{1,2}{0,1}; 
    do 
    echo "Test-$i" > tst-fifo; 
    done 
)& 

Puede substituir la línea de llave con:

(echo "Test-$i" > tst-fifo&); 

Todos clien Los datos enviados a la tubería se leen, aunque con la opción dos del cliente uno puede necesitar iniciar el servidor un par de veces antes de que se lean todos los datos.

Pero aunque la lectura espera para empezar los datos en la tubería, una vez que se han insertado los datos, lee la cadena vacía para siempre.

¿Alguna manera de detener esto?

Gracias por cualquier idea de nuevo.

6

Como dije en otras respuestas, necesita mantener la fifo abierta en todo momento para evitar perder datos.

Sin embargo, una vez que todos los escritores han salido después de que se haya abierto el fifo (por lo que hubo un escritor), lee el retorno inmediatamente (y poll() devuelve POLLHUP). La única manera de borrar este estado es volver a abrir el fifo.

POSIX no proporciona una solución a esto, pero al menos Linux y FreeBSD sí lo hacen: si las lecturas comienzan a fallar, abra nuevamente el fifo mientras se mantiene abierto el descriptor original. Esto funciona porque en Linux y FreeBSD el estado "hangup" es local para una descripción de archivo abierta particular, mientras que en POSIX es global para fifo.

Esto se puede hacer en una secuencia de comandos shell como esto:

while :; do 
    exec 3<tmp/testfifo 
    exec 4<&- 
    while read x; do 
     echo "input: $x" 
    done <&3 
    exec 4<&3 
    exec 3<&- 
done 
+1

En Bash, en lugar de '{... leer ...} <& 3', puede usar' leer -u 3' para leer desde un número de descriptor de archivo especificado en lugar de 0. – ephemient

+0

@ephemient Qué ventaja hace 'leer -u 3 x' proporcionar arriba 'leer x <& 3 '? – jilles

+0

¡Guau, esto funciona!¿Puedes explicar por qué no puedo usar fd 1 en lugar de 3? Funciona la primera vez, pero luego falla. Publicaré un comentario por separado para mostrar el último guión completo. – asoundmove

1

Sólo para aquellos que pudieran estar interesados, [[re-editado]] siguientes comentarios de camh y Jilles, aquí hay dos nuevas versiones de el script del servidor de prueba

Ambas versiones ahora funcionan exactamente como se esperaba. Versión

de camh para la gestión de la tubería: Versión

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    fi 
    done 3< "$pipe" 4> "$pipe" # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF 
} 

de Jille para la gestión de la tubería:

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    exec 3< "$pipe" 
    exec 4<&- 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    else 
     # Close the pipe and reconnect it so that the next read does not end up returning EOF 
     exec 4<&3 
     exec 3<&- 
     exec 3< "$pipe" 
     exec 4<&- 
    fi 
    done 
} 

Gracias a todos por su ayuda.

+2

No se puede atrapar SIGKILL. No tiene sentido intentarlo. Además, vea mi última edición para un enfoque más simple que no necesita duplicar los descriptores de archivos. – camh

+0

Ok. Probado y, por supuesto, tienes razón. Gracias por los consejos camh. – asoundmove

0

ejecución decir 10 puestos de trabajo en paralelo a lo sumo, y el resto cola para su posterior procesamiento, pero fiable saber que hacen correr

Usted puede hacer esto con GNU paralelo. No necesitarás este scripting.

http://www.gnu.org/software/parallel/man.html#options

Puede establecer max-procs "Número de jobslots. Corre hacia empleos N en paralelo." Existe una opción para establecer la cantidad de núcleos de CPU que desea usar. Puede guardar la lista de trabajos ejecutados en un archivo de registro, pero esa es una función beta.

Cuestiones relacionadas