2010-08-11 13 views
6

Primero, un poco de contexto para explicar por qué estoy en la ruta de "muestreo UDP":
Me gustaría muestrear datos producidos a gran velocidad durante un período de tiempo desconocido. Los datos que deseo muestrear están en otra máquina que no sea la que consume los datos. Tengo una conexión Ethernet dedicada entre los dos para que el ancho de banda no sea un problema. El problema que tengo es que la máquina que consume los datos es mucho más lenta que la que los produce. Una limitación adicional es que, aunque está bien que no obtenga todas las muestras (son solo ejemplos), es obligatorio obtener el último.Cómo hacer que un socket UDP reemplace mensajes antiguos (aún no recv() 'd) cuando llega nuevo?

Mi primera solución fue hacer que el productor de datos envíe un datagrama UDP para cada muestra producida y dejar que el consumidor trate de obtener las muestras que pueda y que la capa de socket descarte cuando el socket UDP esté lleno. El problema con esta solución es que cuando llegan nuevos datagramas UDP y el socket está lleno, son los nuevos datagramas que se descartan y no los antiguos. ¡Por lo tanto, no estoy garantizado para tener el último!

Mi pregunta es: ¿hay alguna manera de hacer que un socket UDP reemplace los viejos datagramas cuando lleguen nuevos?

El receptor es actualmente una máquina Linux, pero eso podría cambiar en favor de otro sistema operativo tipo Unix en el futuro (ventanas pueden ser posibles, ya que implementa sockets BSD, pero es menos probable)
La solución ideal sería el uso generalizado mecanismos (como setsockopt() s) para trabajar.

PD: Pensé en otras soluciones pero son más complejas (implican una gran modificación del remitente), por lo tanto, me gustaría primero tener un estado definido sobre la fiabilidad de lo que pido. :)

actualizaciones: - Yo sé que el sistema operativo en el equipo receptor puede manejar la carga de la red + volver a montar el tráfico generado por el remitente. Es solo que su comportamiento predeterminado es descartar nuevos datagramas cuando el buffer de socket está lleno. Y debido a los tiempos de procesamiento en el proceso de recepción, sé que se llenará haga lo que haga (desperdiciar la mitad de la memoria en un buffer de socket no es una opción :)).
- Realmente me gustaría evitar tener un proceso de ayuda haciendo lo que el sistema operativo podría haber hecho en el momento de despachar paquetes y desperdiciar recursos simplemente copiando mensajes en un SHM.
- El problema que veo al modificar el remitente es que el código al que tengo acceso es solo una función PleaseSendThisData(), no tiene conocimiento de que puede ser la última vez que se llama antes de un tiempo prolongado, entonces yo don No veo ningún truco factible en ese extremo ... ¡pero estoy abierto a sugerencias! :)

Si realmente no hay forma de cambiar el comportamiento de recepción de UDP en un socket BSD, entonces bueno ... sólo dígame, estoy preparado para aceptar esta terrible verdad y comenzaré a trabajar en el "proceso de ayuda" solución cuando vuelvo a ella :)

+1

¿Cuál es la plataforma y el idioma del host receptor? – msw

+1

Pregunta muy bien enmarcada –

+0

¿Sería "consumir" o "leer" un reemplazo adecuado para "explotar" en su pregunta? "Exploit" parece confuso aquí. – nategoose

Respuesta

2

Esto será difícil de lograr completamente justo en el lado del oyente, ya que podría perderse el último paquete en el Chip de interfaz de red, lo que evitará que su programa haya tenido la oportunidad de verlo.

El código UDP del sistema operativo sería el mejor lugar para tratar con esto, ya que recibirá nuevos paquetes incluso si decide descartarlos porque ya tiene demasiados en cola. Entonces podría tomar la decisión de dejar caer uno viejo o dejar uno nuevo, pero no sé cómo decirle que esto es lo que le gustaría que hiciera.

Puede intentar tratar esto en el receptor teniendo un programa o hilo que siempre intenta leer en el paquete más nuevo y otro que siempre intenta obtener el paquete más nuevo. Cómo hacer esto diferiría según si lo hizo como dos programas separados o como dos hilos.

como hilos que se necesita un mutex (semáforo o algo parecido) para proteger un puntero (o de referencia) a una estructura que se utiliza para contener 1 UDP carga útil y cualquier otra cosa que quería allí en (tamaño, remitente IP, el puerto emisor , marca de tiempo, etc.).

El hilo que realmente lee los paquetes del socket almacena los datos del paquete en una estructura, adquiere el mutex que protege ese puntero, intercambia el puntero actual por un puntero a la estructura que acaba de crear, libera el mutex, señala el hilo de procesador que tiene algo que ver, y luego limpiar la estructura que acaba de recibir un puntero a y utilizarlo para realizar el próximo paquete que entra.

el hilo que las cargas útiles de paquetes realidad procesados ​​deben esperar en la señal del otro hilo y/o periódicamente (500 ms o más es probablemente un buen punto de partida para esto, pero usted decide) y adquirir el mutex, cambiar su puntero a una estructura de carga UDP con la que está allí, liberar el mutex, y luego, si la estructura tiene algún paquete de datos, es Debería procesarlo y luego esperar en la próxima señal. Si no tenía datos, debería seguir adelante y esperar la siguiente señal.

El hilo del procesador probablemente debería ejecutarse con una prioridad menor que el oyente UDP para que el oyente tenga menos probabilidades de perder un paquete. Al procesar el último paquete (el que realmente le importa), el procesador no se interrumpirá porque no hay paquetes nuevos para que el oyente pueda escuchar.

Puede ampliar esto utilizando una cola en lugar de un solo puntero como el lugar de intercambio para los dos hilos. El puntero único es solo una cola de longitud 1 y es muy fácil de procesar.

También podría extender esto intentando que la cadena de escucha detecte si hay varios paquetes esperando y solo colocando realmente el último de ellos en la cola para la cadena del procesador.La forma de hacerlo será diferente según la plataforma, pero si usted está usando un * nix entonces esto debe devolver 0 para sockets sin nada de espera:

while (keep_doing_this()) { 
    ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
    // this could have been recv or recvfrom 
    if (len < 0) { 
     error(); 
    } 
    int sz; 
    int rc = ioctl(udp_socket_fd, FIONREAD, &sz); 
    if (rc < 0) { 
     error(); 
    } 
    if (!sz) { 
     // There aren't any more packets ready, so queue up the one we got 
     my_udp_packet->current_len = len; 

     my_udp_packet = swap_udp_packet(my_ucp_packet); 
     /* swap_udp_packet is code you would have to write to implement what I talked 
      about above. */ 

     tgkill(this_group, procesor_thread_tid, SIGUSR1); 
    } else if (sz > my_udp_packet->buf_len) { 
     /* You could resize the buffer for the packet payload here if it is too small.*/ 
    } 
} 

Un udp_packet tendría que ser asignado para cada hilo, así como 1 para el puntero de intercambio. Si utiliza una cola para intercambiar, debe tener suficientes udp_packets para cada posición en la cola, dado que el puntero solo es una cola de longitud 1, solo necesita 1.

Si está utilizando un sistema POSIX, considere no usando una señal en tiempo real para la señalización porque hacen cola. El uso de una señal regular le permitirá tratar el hecho de recibir una señal varias veces igual a la señalización una sola vez hasta que se maneje la señal, mientras que las señales en tiempo real hacen cola. Despertarse periódicamente para verificar la cola también le permite manejar la posibilidad de que llegue la última señal justo después de verificar para ver si tenía paquetes nuevos, pero antes de llamar al pause para esperar una señal.

+0

Tiene razón en que lo que estoy buscando es una manera de decirle al sistema operativo que cambie su comportamiento al enviar nuevos datagramas UDP. Su algoritmo propuesto es mi "plan B" y cada vez tengo más la sensación de que será mi única solución simple ...:/ Muchas gracias por tomarse el tiempo para explicarlo en tales detalles, ¡fue una lectura interesante! PD: en la solución del "plan B" que tenía en mente, no estaba usando señales, sino variables de condición, que me gustan más porque no son asíncronas como las señales. – Nawak

+0

@Nawak: el uso de señales permite que el hilo de procesamiento no consuma tiempo de CPU mientras no haya trabajo por hacer. Si se trata de una máquina con un solo procesador, los dos hilos ya competirán por el tiempo de la CPU y hacer que el procesador ocupado espere ralentizará todo (probablemente más lento de lo que ya es). En su manejador de señal puede incrementar un int volátil que mira el subproceso del procesador cuando se despierta. Si la variable no se cambia, el hilo llama 'pausa 'nuevamente y vuelve a dormirse. – nategoose

+0

Las variables de condición no están "ocupadas esperando".El hilo que llama a pthread_cond_wait() está realmente durmiendo hasta que se despierta por el hilo del otro subproceso pthread_signal() o pthread_cond_broadcast() ... La ventaja adicional es que el mutex que protege la memoria compartida se puede usar con el cond_var y el subproceso despertado automáticamente posee el mutex de la memoria compartida en el despertador. – Nawak

5

Simplemente establezca el socket en no-bloqueo, y bucle en recv() hasta que devuelva < 0 con errno == EAGAIN. Luego, procese el último paquete que recibió, enjuague y repita.

+0

El problema es que la máquina receptora es mucho más lenta que la máquina remitente y mientras procesa el último datagrama en el búfer de socket, el búfer de socket puede volverse lleno nuevamente y perder el último datagrama real de la "transmisión". – Nawak

+0

@Nawak: para solucionarlo, puede establecer un búfer de recepción lo suficientemente grande en el zócalo, como explica valdo. – caf

3

Acepto "caf". Establezca el socket en un modo sin bloqueo.

Cuando reciba algo en el zócalo, lea en un bucle hasta que no quede nada más. Luego maneje el último datagrama leído.

Sólo una nota: se debe establecer un sistema de gran buffer de recepción para la toma

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok 
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize)); 
1

Otra idea es tener un proceso lector dedicado que no hace más que lazos en la toma y lee los paquetes entrantes en el buffer circular en la memoria compartida (tendrá que preocuparse por el orden correcto de escritura). Algo así como kfifo. El no bloqueo está bien aquí también. Los nuevos datos anulan los datos antiguos. Luego, otros procesos siempre tendrían acceso al último bloque en la cabecera de la cola y, todos los fragmentos anteriores aún no sobrescritos.

Puede ser demasiado complicado para un lector unidireccional simple, solo una opción.

+0

Sí, mi temor inicial era que sería la única solución. No me gusta, ya que desperdiciaría un poco de los ya escasos recursos en el receptor ... Esperaba un truco en el conector BSD que permitiera sobrescribir los mensajes antiguos en lugar de descartar los nuevos. De esta forma, el sistema operativo haría casi la misma cantidad de trabajo y seguiría habiendo recursos suficientes para la parte de procesamiento. – Nawak

+1

No, los enchufes no funcionan así. Otra opción es unir las actualizaciones en la máquina más rápida y publicar datos compuestos en un temporizador más grueso. –

1

Estoy bastante seguro de que se trata de un problema probadamente insoluble estrechamente relacionado con el Two Army Problem.

Puedo pensar en una solución sucia: establecer una conexión TCP de "banda lateral" de control que lleva el último paquete que también es una indicación de "transmisión final". De lo contrario, debe utilizar uno de los medios pragmáticos más generales que se indican en Engineering Approaches.

Cuestiones relacionadas