2012-04-12 14 views
8

Como parte de nuestra aplicación (en la producción de alrededor de 4 meses) tenemos un flujo de datos procedentes de un dispositivo externo que convertimos a un IObservablemétodo preferido para generar un IObservable <String> de una corriente de

Hasta ahora hemos estado utilizando lo siguiente para generarlo, y ha estado funcionando bastante bien.

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return Observable 
      .Create<string>(observer => Scheduler.ThreadPool 
      .Schedule(() => ReadLoop(streamReader, observer))); 
} 

private void ReadLoop(StreamReader reader, IObserver<string> observer) 
{ 
    while (true) 
    { 
     try 
     { 
      var line = reader.ReadLine(); 
      if (line != null) 
      { 
       observer.OnNext(line); 
      } 
      else 
      { 
       observer.OnCompleted(); 
       break; 
      } 
     } 
     catch (Exception ex) 
     { 
      observer.OnError(ex); 
      break; 
     } 
    } 
} 

Anoche me preguntó si había una manera de utilizar la sintaxis yield return para lograr el mismo resultado y se le ocurrió esto:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    var streamReader = new StreamReader(inputStream); 
    return ReadLoop(streamReader) 
      .ToObservable(Scheduler.ThreadPool); 
} 

private IEnumerable<string> ReadLoop(StreamReader reader) 
{ 
    while (true) 
    { 
     var line = reader.ReadLine(); 
     if (line != null) 
     { 
      yield return line; 
     } 
     else 
     { 
      yield break; 
     } 
    } 
} 

Parece que funciona bastante bien y es mucho más limpio, pero me preguntaba si había pros o contras de una manera sobre la otra, o si había una mejor manera completamente.

+2

Pro: 'return' rendimiento apoya perezoso carga/tardía de su colección. –

+2

Con: cuando se lanza una excepción, no llama a OnException, simplemente se dispara –

+0

Supongo que depende si no te importa grabar un hilo para hacer tu ciclo de lectura, que se reduce a la cantidad de dispositivos que necesitas admitir. Escribí un AsyncTextReader que era él mismo Observable para hacer algo similar, pero a escala. Seguramente en estos días usted podría ESPERAR algo ... – piers7

Respuesta

11

Creo que tienes una buena idea allí (activar Stream en Enumerable y IObservable). Sin embargo, el código Enumberable puede ser mucho más limpio:

IEnumerable<string> ReadLines(Stream stream) 
{ 
    using (StreamReader reader = new StreamReader(stream)) 
    { 
     while (!reader.EndOfStream) 
      yield return reader.ReadLine(); 
    } 
} 

Y luego lo observable:

IObservable<string> ObserveLines(Stream inputStream) 
{ 
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool); 
} 

Esto es más corto, más fácil de leer, y debidamente dispone de los arroyos. También es flojo.

La extensión ToObservable se encarga de capturar los eventos OnNext (nuevas líneas), así como el evento OnCompleted (final del enumerable) y OnError.

+0

Agradable, muy limpio. Tendré que intentarlo mañana. Mi única preocupación es que puedo obtener un nulo final como el último elemento en el Observable, pero eso es fácil de filtrar con .Where (line => line! = Null) – baralong

2

No tengo el código a mano, pero he aquí cómo hacerlo async pre-async CTP.

[Nota para los lectores desnatada: hay necesidad de preocuparse si usted no necesita escalar mucho]

Crear una aplicación AsyncTextReader, que es en sí mismo observable. El controlador toma una secuencia y realiza un BeginRead (256bytes) en la secuencia, pasándose a sí mismo como continuación y luego regresando.

Cuando se ingresa la continuación, llame a EndRead y agregue los bytes devueltos en un pequeño búfer en la clase. Repita esto hasta que el buffer contenga una o más secuencias de fin de línea (según TextWriter). Cuando esto suceda, envíe esos bits del buffer como una cadena a través de la interfaz Observable, y repita.

Cuando hayas terminado, indica OnComplete, etc ... (y desecha la transmisión). Si obtiene una excepción lanzada desde EndReadByte en su continuación, recójala y transfiérala a la interfaz OnError.

código de llamada a continuación, se ve así:

IObservable = new AsyncTextReader (corriente);

Esto escala bien. Solo necesita asegurarse de no hacer nada demasiado tonto con el manejo del buffer.

seudo código:

public ctor(Stream stream){ 
    this._stream = stream; 
    BeginRead(); 
    return; 
} 

private void BeginRead(){ 
    // kick of async read and return (synchronously) 
    this._stream.BeginRead(_buffer,0,256,EndRead,this); 
} 

private void EndRead(IAsyncResult result){ 
    try{ 
     // bytesRead will be *up to* 256 
     var bytesRead = this._stream.EndRead(result); 
     if(bytesRead < 1){ 
      OnCompleted(); 
      return; 
     } 
     // do work with _buffer, _listOfBuffers 
     // to get lines out etc... 
     OnNext(aLineIFound); // times n 
     BeginRead(); // go round again 
    }catch(Exception err){ 
     OnException(err); 
    } 
} 

Ok, esta es la APM, y algo que sólo una madre podría amar. Espero profundamente la alternativa.

ps: si el lector debe cerrar la secuencia es una pregunta interesante. Yo digo que no, porque no lo creó.

0

Con asíncrono/esperan el apoyo, el siguiente es más probable que su mejor opción:

IObservable<string> ObserveStringStream(Stream inputStream) 
{ 
    return Observable.Using(() => new StreamReader(inputStream), 
     sr => Observable.Create<string>(async (obs, ct) => 
     { 
      while (true) 
      { 
       ct.ThrowIfCancellationRequested(); 
       var line = await sr.ReadLineAsync().ConfigureAwait(false); 
       if (line == null) 
        break; 
       obs.OnNext(line); 
      } 
      obs.OnCompleted(); 
    })); 
} 
Cuestiones relacionadas