2012-05-01 20 views
7

Estoy tratando de utilizar Rx para leer desde una corriente de recepción TCPClient y analizar los datos en un IObservable de cadena, delimitado por la nueva línea "\ r \ n" La siguiente es la forma en que estoy recibiendo desde el socket stream ...Observable Red IO Parsing

var messages = new Subject<string>(); 

var functionReceiveSocketData = 
      Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int> 
      (client.Client.BeginReceive, client.Client.EndReceive); 

Func<byte[], int, byte[]> copy = (bs, n) => 
     { 
      var rs = new byte[buffer.Length]; 
      bs.CopyTo(rs, 0); 
      return rs; 
     }; 

Observable 
    .Defer(() => 
      { 
       var buffer = new byte[50]; 
       return 
        from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None) 
       select copy(buffer, n); 
      }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x))); 

Esto es lo que se me ocurrió para analizar la cadena. Esto actualmente no funciona ...

obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

El asunto del mensaje recibe el mensaje en trozos, así que estoy tratando de concat ellos y comprobar si la cadena concatenada contiene salto de línea, por lo tanto la señalización de la memoria intermedia para cerrar y la salida del buffer trozos. No estoy seguro de por qué no está funcionando. Parece que solo saco el primer pedazo de obsStrings.

Así que estoy buscando dos cosas. Me gustaría simplificar la lectura del flujo de io y eliminar el uso del tema de mensajes. En segundo lugar, me gustaría que mi análisis de cadenas funcione. He estado pirateando esto por un tiempo y no puedo encontrar una solución funcional. Soy un principiante con Rx.

EDIT: Aquí está el producto terminado después de que el problema fue resuelto ....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None) 
      .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray()) 
      .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
      .Where(x => x.EndsWith("\r\n")) 
      .Select(buffered => String.Join("", buffered)) 
      .Select(a => a.Replace("\n", "")); 

"ReceiveUntilCompleted" es una extensión del proyecto RXX.

Respuesta

3
messages 
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b) 
    .Where(x => x.EndsWith("\r\n")) 
+0

Descubrí que no necesitaba el .Buffer (1); al final. – TK3

+0

Se ha eliminado de la respuesta. – ronag

1

En lugar de Subscribe y utilizando el Subject, se puede tratar simplemente Select:

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

Ahora suponiendo que todo esto ha entrado en una nueva observables llamada messages, el siguiente problema es que en esta línea

var obsStrings = messages.Buffer<string,string>(() => 
       messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n")) 
      ); 

¡Está utilizando tanto Buffer como Scan, y está intentando hacer lo mismo en ambos! Tenga en cuenta que Buffer necesita un selector de cierre.

Lo que realmente quiere decir:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n"))) 
         .Select(buffered => String.Join(buffered)); 

que da un tamponada con respecto a cuándo cerrar la ventana (cuando contiene \ r \ n) y da Seleccione la cantidad tamponada para concatenar observable. Esto da como resultado una nueva observación de sus cadenas divididas.

Un problema es que todavía puede tener la nueva línea en el medio de un fragmento y esto causará problemas. Una idea simple es observar en los personajes en lugar de trozos de cuerda completos, tales como:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

A continuación, puede hacer messages.Where(c => c != '\r') para saltar \r y cambiar el tampón para:

var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n'))) 
         .Select(buffered => String.Join("", buffered)); 
+0

Verifica mi edición Aún obteniendo resultados extraños. – TK3

+0

Piensa que quieres 'messages.Where (x => x == '\ n')' not 'messages.Where (x => x! = '\ N')' en la llamada 'Buffer'. Es decir, romperá el búfer en cada nueva línea. – yamen

+0

Obteniendo trozos inconexos fuera de este oberservable.Me pregunto si se trata de un problema de enhebrado donde las pruebas se están realizando en otro hilo y el almacenamiento en búfer continúa o se rompe antes de tiempo, por lo que el almacenamiento en búfer se vuelve indeterminado. – TK3