2012-01-21 18 views
13

me di cuenta de que una toma PUB zeromq se amortigua todos los datos de salida si se trata de conectar, por ejemplotoma PUB ZeroMQ amortigua todo mi yendo de datos cuando se está conectando

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.connect("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.bind("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print sub.recv() 

El sub une después de esos mensajes, debería descartarse, porque PUB debería eliminar los mensajes si no hay nadie conectado a él. Pero en lugar de eliminar mensajes, almacena todos los mensajes.

a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
a message should not be dropped 
hi 

Como se puede ver, los que "un mensaje no se debe dejar caer" son amortiguados por el zócalo, una vez que se conecta, se les vaciar a la toma SUB. Si me enlace en el socket PUB y me conecto en el socket SUB, entonces funciona correctamente.

import zmq 
import time 
context = zmq.Context() 

# create a PUB socket 
pub = context.socket (zmq.PUB) 
pub.bind("tcp://127.0.0.1:5566") 
# push some message before connected 
# they should be dropped 
for i in range(5): 
    pub.send('a message should not be dropped') 

time.sleep(1) 

# create a SUB socket 
sub = context.socket (zmq.SUB) 
sub.connect("tcp://127.0.0.1:5566") 
sub.setsockopt(zmq.SUBSCRIBE, "") 

time.sleep(1) 

# this is the only message we should see in SUB 
pub.send('hi') 

while True: 
    print repr(sub.recv()) 

Y sólo se puede ver la salida

'hi' 

Este tipo de comportamiento extraño causar un problema, buffers todos los datos de un conector de conexión, tengo dos servidores, el servidor A publica datos al servidor B

Server A -- publish --> Server B 

Funciona bien si el servidor B se conecta. Pero, ¿qué sucede si inicio el Servidor A y no comienzo el Servidor B?

Como resultado, el conector PUB de conexión en el servidor A mantiene todos esos datos, el uso de la memoria se vuelve más y más alto.

Aquí está el problema, ¿este tipo de comportamiento es un error o una característica? Si es una función, ¿dónde puedo encontrar un documento que mencione este comportamiento? ¿Y cómo puedo detener la conexión de los buffers de PUB a todos los datos?

Gracias.

Respuesta

6

Ya sea que los bloques de zócalo o gotas mensajes depende del tipo de conector como se describe en la ZMQ::Socket documentation (énfasis a continuación es mío):

ZMQ :: HWM: Recuperar marca de agua alta

El ZMQ: : La opción HWM recuperará la marca de nivel máximo para el socket especificado . La marca de agua máxima es un límite estricto en el número máximo de de mensajes pendientes 0MQ debe hacer cola en la memoria para cualquier par único con el que se comunique el socket especificado.

Si se ha alcanzado este límite de la toma entrará un estado excepcional y dependiendo del tipo de socket, 0MQ tomará las medidas adecuadas como bloquear o dejar caer los mensajes enviados. Consulte las descripciones de socket individuales en ZMQ :: Socket para obtener detalles sobre la acción exacta tomada para cada tipo de socket.

El valor predeterminado ZMQ :: HWM de cero significa "sin límite".

Puede ver si se puede bloquear o dejar mirando a través de la documentación para el tipo de socket para ZMQ::HWM option action que, o bien ser Block o Drop.

La acción de ZMQ::PUB es Drop, así que si no está cayendo debe comprobar el HWM (Máximo) y el valor de prestar atención a la advertencia de que El valor ZMQ :: HWM predeterminado de cero significa “sin límite”, lo que significa que no entrará en un estado excepcional hasta que el sistema se quede sin memoria (en ese momento no sé cómo se comporta).

+0

Sé que puedo configurar el HWM para limitar el número de mensaje en el búfer. Pero no resuelve el problema, la forma en que PUB maneja el estado de HWM es soltar nuevos mensajes. Significa que si configura HWM, solo los mensajes principales se guardan en el búfer. Lo que estoy escribiendo es un sistema de transmisión de audio. Este tipo de comportamiento lo hace muy molesto de usar. Digamos que usted envía mensajes [1, 2, 3, 4] y luego HWM se configuró en 2, luego el zócalo almacenará en búfer [1, 2] para usted, todos los mensajes nuevos se descartan. Pero para la transmisión de audio, la parte más importante son los nuevos datos que se aproximan. ¿Hay alguna forma de ajustar cómo HWM elimina el mensaje? –

+0

Ah, entonces quieres decir que el comportamiento que te gustaría es que si HWM está configurado en 2 y envías [1, 2, 3, 4], entonces debería caer [1, 2] y mantener [3, 4], pero luego si enviaste 5, ¿debería dejar 3 y terminar con [4, 5]? No creo que ese comportamiento exista en ZMQ. – aculich

+0

Esto es muy interesante. Ciertamente, para algunas aplicaciones sería necesario tener la habilidad de soltar los mensajes "antiguos" (la telefonía IP viene a la mente como un ejemplo común). –

0

Así que bind() y connect() resultan en dos comportamientos diferentes. ¿Por qué no simplemente eliges cuál prefieres (parece que bind()) y usas eso?

De hecho, es una característica de ZeroMQ en general que almacena temporalmente los mensajes salientes hasta que se establece una conexión.

+0

Porque tengo varios nodos que desean publicar datos en un servidor conocido. Por supuesto, puedo enlazar en el lado PUB, pero como resultado, necesito N dirección para cada nodo, el servidor no sabe cuántos nodos habría. Creo que el enlace y la conexión no deberían afectar el comportamiento, una vez que se establece la conexión, no hay diferencia entre vincular y conectarse, entonces ¿por qué hacer la diferencia? No entiendo: S –

+0

Oh OK. Bueno, creo que ZeroMQ se está comportando como se esperaba y según lo diseñado, por lo que puede que solo tenga que consultar la conexión antes de enviar datos. –

+0

@JohnZwinck La elección de 'bind()' frente a 'connect()' no se basa en la preferencia, sino que debe basarse en cómo se usa. Lo está usando correctamente con 'bind()' en el servidor (el editor) y 'connect()' en el cliente (el suscriptor). Y no siempre almacena en búfer los mensajes salientes, sino que está determinado por el tipo de socket y el valor de la marca de nivel máximo como [se explica aquí con referencias a la documentación] (http://stackoverflow.com/a/8958699/462302) – aculich

0

Debería poder establecer una marca de agua alta en el zócalo utilizando la configuración de hwm en el zócalo pub. Le permite definir cuántos mensajes se guardan.

1

Configuran la opción HWM en el zócalo.

4

Creo que este comportamiento es la semántica de zmq_connect(). Esto es: cuando zmq_connect() devuelve éxito, luego la conexión se establece conceptualmente y, por lo tanto, su conexión-PUB inicia el mensaje de cola en lugar de soltar.

siguiente extracto de "ZMQ Guide" es una sugerencia para esto:

En teoría con tomas OMQ, no importa qué extremo se conecta, y que se une final. Sin embargo, con sockets PUB-SUB, si enlaza el socket SUB y conecta el socket PUB, el socket SUB puede recibir mensajes antiguos , es decir, mensajes enviados antes de que el SUB se inicie. Este es un artefacto de la forma en que se unen/conectan las obras. Lo mejor es enlazar el PUB y conectar el SUB, si es posible.

siguiente sección en zmq_connect() tiene algunos consejos, que se muestran a continuación:

diferencias clave a enchufes convencionales

En términos generales, enchufes convencionales presentan una interfaz síncrona a cualquiera orientado a la conexión fiable las secuencias de bytes (SOCK_STREAM) o los datagramas no confiables sin conexión (SOCK_DGRAM). En comparación, los sockets ØMQ presentan una abstracción de una cola de mensajes asíncrona , con la semántica de colas exacta dependiendo del tipo de socket en uso. Cuando los sockets convencionales transfieren flujos de bytes o datagramas discretos, los sockets ØMQ transfieren mensajes discretos.

sockets OMQ siendo asíncrona significa que los horarios de la configuración conexión física y derribar, vuelva a conectar y la entrega eficaz son transparente para el usuario y organizado por sí mismo OMQ. Además, los mensajes pueden ponerse en cola en caso de que un par no esté disponible para recibirlos.

0

Aquí hay un truco que puede ayudar ...

Fije su ZMQ::HWM a un número fijo, por ejemplo, 10.Tras la conexión, llame al método recv del zócalo del suscriptor en un bucle hasta que descarte todos los mensajes almacenados en el búfer y, a continuación, ENTONCES, inicie su bucle de recepción principal.

Cuestiones relacionadas