Estoy tratando de utilizar Rx para leer desde una corriente de recepción TCPClient y analizar los datos en un IObservable de cadena, delimitado por la nueva línea "\ r \ n" La siguiente es la forma en que estoy recibiendo desde el socket stream ...Observable Red IO Parsing
var messages = new Subject<string>();
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[buffer.Length];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[50];
return
from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
select copy(buffer, n);
}).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));
Esto es lo que se me ocurrió para analizar la cadena. Esto actualmente no funciona ...
obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
El asunto del mensaje recibe el mensaje en trozos, así que estoy tratando de concat ellos y comprobar si la cadena concatenada contiene salto de línea, por lo tanto la señalización de la memoria intermedia para cerrar y la salida del buffer trozos. No estoy seguro de por qué no está funcionando. Parece que solo saco el primer pedazo de obsStrings.
Así que estoy buscando dos cosas. Me gustaría simplificar la lectura del flujo de io y eliminar el uso del tema de mensajes. En segundo lugar, me gustaría que mi análisis de cadenas funcione. He estado pirateando esto por un tiempo y no puedo encontrar una solución funcional. Soy un principiante con Rx.
EDIT: Aquí está el producto terminado después de que el problema fue resuelto ....
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
.SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
.Select(buffered => String.Join("", buffered))
.Select(a => a.Replace("\n", ""));
"ReceiveUntilCompleted" es una extensión del proyecto RXX.
Descubrí que no necesitaba el .Buffer (1); al final. – TK3
Se ha eliminado de la respuesta. – ronag