2010-05-06 19 views
12

Tengo un objeto Stream que ocasionalmente obtiene algunos datos, pero a intervalos impredecibles. Los mensajes que aparecen en la transmisión están bien definidos y declaran el tamaño de su carga útil por adelantado (el tamaño es un entero de 16 bits contenido en los primeros dos bytes de cada mensaje).leyendo continuamente de una secuencia?

Me gustaría tener una clase StreamWatcher que detecte cuándo el Stream tiene algunos datos en él. Una vez que lo haga, me gustaría que se genere un evento para que una instancia de StreamProcessor suscrita pueda procesar el nuevo mensaje.

¿Se puede hacer esto con eventos C# sin utilizar Threads directamente? Parece que debería ser sencillo, pero no puedo entender bien el camino correcto para diseñar esto.

Respuesta

0

Sí, esto se puede hacer. Use el método Stream.BeginRead sin bloqueo con un AsyncCallback. La devolución de llamada se llama de manera asíncrona cuando los datos están disponibles. En la devolución de llamada, llame al Stream.EndRead para obtener los datos, y llame de nuevo al Stream.BeginRead para obtener el siguiente fragmento de datos. Buffer datos entrantes en una matriz de bytes que es lo suficientemente grande como para contener el mensaje. Una vez que la matriz de bytes está llena (pueden necesitarse varias llamadas de devolución de llamada), plantee el evento. Luego lea el siguiente tamaño del mensaje, cree un nuevo buffer, repita, done.

+0

Dijo sin usar hilos !!! =) – Nayan

+0

@Nayan: Asynchrony * es * multithreading. Este problema es inherentemente uno de multihilo. Sospecho que lo que significa OP es que no quiere crear los hilos por sí mismo. –

+0

@Nayan: Supongo que el OP está buscando una solución que no cree explícitamente un nuevo subproceso y utilice el método de bloqueo de lectura, pero para una solución asincrónica sin bloqueo. No puede tener asincronicidad sin hilos. – dtb

1

El enfoque normal es utilizar el .NET asynchronous programming pattern expuesto por Stream. Esencialmente, comienza a leer asincrónicamente llamando al Stream.BeginRead, pasándole un buffer byte[] y un método de devolución de llamada que se invocará cuando se lean los datos de la transmisión. En el método de devolución de llamada, llame al Stream.EndRead, pasando el argumento IAsncResult que se le dio a su devolución de llamada. El valor de retorno de EndRead le indica cuántos bytes se leyeron en el búfer.

Una vez que haya recibido los primeros bytes de esta manera, puede esperar el resto del mensaje (si no obtuvo suficientes datos la primera vez) llamando al BeginRead nuevamente. Una vez que tenga todo el mensaje, puede plantear el evento.

11

Cuando no decimos utiliza hilos directamente, supongo que todavía desea utilizarlos indirectamente a través de llamadas asíncronas, de lo contrario esto no sería muy útil.

Todo lo que necesita hacer es ajustar los métodos asincrónicos del Stream y almacenar el resultado en un búfer. En primer lugar, vamos a definir la parte de eventos de la especificación:

public delegate void MessageAvailableEventHandler(object sender, 
    MessageAvailableEventArgs e); 

public class MessageAvailableEventArgs : EventArgs 
{ 
    public MessageAvailableEventArgs(int messageSize) : base() 
    { 
     this.MessageSize = messageSize; 
    } 

    public int MessageSize { get; private set; } 
} 

Ahora, lea un entero de 16 bits de la corriente de forma asíncrona e informar de vuelta cuando esté listo:

public class StreamWatcher 
{ 
    private readonly Stream stream; 

    private byte[] sizeBuffer = new byte[2]; 

    public StreamWatcher(Stream stream) 
    { 
     if (stream == null) 
      throw new ArgumentNullException("stream"); 
     this.stream = stream; 
     WatchNext(); 
    } 

    protected void OnMessageAvailable(MessageAvailableEventArgs e) 
    { 
     var handler = MessageAvailable; 
     if (handler != null) 
      handler(this, e); 
    } 

    protected void WatchNext() 
    { 
     stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback), 
      null); 
    } 

    private void ReadCallback(IAsyncResult ar) 
    { 
     int bytesRead = stream.EndRead(ar); 
     if (bytesRead != 2) 
      throw new InvalidOperationException("Invalid message header."); 
     int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0]; 
     OnMessageAvailable(new MessageAvailableEventArgs(messageSize)); 
     WatchNext(); 
    } 

    public event MessageAvailableEventHandler MessageAvailable; 
} 

Creo que eso es todo. Esto supone que cualquiera que sea la clase que maneje el mensaje también tiene acceso al Stream y está preparado para leerlo, de forma síncrona o asincrónica, según el tamaño del mensaje en el evento. Si desea que la clase del observador realmente lea el mensaje completo, tendrá que agregar más código para hacerlo.

+0

+1 para la implementación que demuestra que mi teoría de BeginRead no es la solución correcta. Su implementación despejó esa duda. – Nayan

+0

Glorioso! Parece que hará el truco, y lo intentaré mañana. –

+0

+1. Nitpicking: AFAIK a Stream.Read puede devolver menos que contar bytes, siempre y cuando devuelva al menos un byte (si no se alcanza el final). Entonces, si '(bytesRead! = 2)' no debe lanzar una excepción sino BeginRead nuevamente hasta que se hayan leído dos bytes. – dtb

1

¿No está utilizando Stream.BeginRead() lo mismo que usar el método síncrono Stream.Read() en un hilo separado?

Cuestiones relacionadas