2009-02-07 8 views
6

Nota: Permítanme disculparme por la longitud de esta pregunta, tuve que poner mucha información en ella. Espero que eso no cause que mucha gente simplemente lo mire y haga suposiciones. Por favor, lea en su totalidad. Gracias.¿Cuál es un buen método para manejar flujos de E/S de red basados ​​en línea?

Tengo un flujo de datos que entra por un socket. Esta información está orientada a la línea.

Estoy usando el APM (método de programación Async) de .NET (BeginRead, etc.). Esto imposibilita el uso de E/S basadas en flujo porque Async I/O está basado en el buffer. Es posible volver a empaquetar los datos y enviarlos a una secuencia, como una secuencia de memoria, pero también hay problemas.

El problema es que mi flujo de entrada (que no tengo control) no me da ninguna información sobre la duración de la transmisión. Es simplemente una corriente de líneas de nueva línea con este aspecto:

COMMAND\n 
...Unpredictable number of lines of data...\n 
END COMMAND\n 
....repeat.... 

Por lo tanto, el uso de APM, y puesto que no sé cuánto tiempo cualquier conjunto de datos dado será, es probable que los bloques de datos se cruzarán búfer límites que requieren lecturas múltiples, pero esas lecturas múltiples también abarcarán múltiples bloques de datos.

Ejemplo:

Byte buffer[1024] = ".................blah\nThis is another l" 
[another read] 
        "ine\n.............................More Lines..." 

Mi primer pensamiento fue utilizar un StringBuilder y simplemente añadir las líneas de tampón a la SB. Esto funciona hasta cierto punto, pero me resultó difícil extraer bloques de datos. Intenté usar un StringReader para leer los datos nuevos, pero no había forma de saber si obtenía una línea completa o no, ya que StringReader devuelve una línea parcial al final del último bloque agregado, seguido de devolver nulo posteriormente. No hay forma de saber si lo que se devolvió fue una línea de datos completa.

Ejemplo:

// Note: no newline at the end 
StringBuilder sb = new StringBuilder("This is a line\nThis is incomp.."); 
StringReader sr = new StringReader(sb); 
string s = sr.ReadLine(); // returns "This is a line" 
s = sr.ReadLine();  // returns "This is incomp.." 

Lo que es peor, es que si sigo añadiendo a los datos, los tampones se hacen más grandes y más grande, y ya que esto podría funcionar durante semanas o meses a la vez que no es un buen solución.

Mi siguiente pensamiento fue eliminar bloques de datos del SB a medida que los leía. Esto requirió escribir mi propia función ReadLine, pero luego me quedé atrapado bloqueando los datos durante las lecturas y escrituras. Además, los bloques de datos más grandes (que pueden consistir en cientos de lecturas y megabytes de datos) requieren escanear todo el búfer en busca de nuevas líneas. No es eficiente y bastante feo.

Estoy buscando algo que tenga la simplicidad de un StreamReader/Writer con la conveniencia de async I/O.

Mi siguiente pensamiento fue utilizar un MemoryStream, y escribir los bloques de datos en una secuencia de memoria luego adjuntar un StreamReader a la secuencia y usar ReadLine, pero nuevamente tengo problemas para saber si la última lectura en el buffer es una línea completa o no, además es aún más difícil eliminar los datos "obsoletos" de la transmisión.

También pensé en usar un hilo con lecturas sincrónicas. Esto tiene la ventaja de que al usar un StreamReader, siempre devolverá una línea completa desde una ReadLine(), excepto en situaciones de conexión interrumpidas. Sin embargo, esto tiene problemas para cancelar la conexión, y ciertos tipos de problemas de red pueden resultar en enchufes bloqueados durante un período prolongado. Estoy usando async IO porque no quiero atar un hilo durante la vida del programa que bloquea la recepción de datos.

La conexión es de larga duración.Y los datos continuarán fluyendo con el tiempo. Durante la conexión inicial, hay un gran flujo de datos, y una vez que se realiza el flujo, el socket permanece abierto esperando actualizaciones en tiempo real. No sé exactamente cuándo el flujo inicial ha "terminado", ya que la única forma de saber es que ya no se envían más datos de inmediato. Esto significa que no puedo esperar a que finalice la carga de datos inicial antes del procesamiento, estoy atascado procesando "en tiempo real" cuando entra.

Entonces, ¿alguien puede sugerir un buen método para manejar esta situación? de una manera que no sea demasiado complicada? Realmente quiero que esto sea lo más simple y elegante posible, pero sigo encontrando soluciones cada vez más complicadas debido a todos los casos extremos. Supongo que lo que quiero es algún tipo de FIFO en el que pueda agregar más datos fácilmente y, al mismo tiempo, extraer datos que coincidan con ciertos criterios (es decir, cadenas terminadas en nueva línea).

+0

pensé que esto era un problema interesante también, así que escribí un post acerca de la solución con el CCR que se puede encontrar en http: //iodyner.spaces.live.com, si le interesa ... –

Respuesta

5

Esa es una pregunta bastante interesante. La solución para mí en el pasado ha sido utilizar un hilo separado con operaciones sincrónicas, como usted propone. (Logré sortear la mayoría de los problemas con el bloqueo de sockets usando bloqueos y muchos manejadores de excepciones.) Aún así, el uso de las operaciones asíncronas incorporadas suele ser aconsejable, ya que permite E/S asíncronas de verdadero nivel del sistema operativo, por lo que entiendo tu punto.

Bueno, he ido y he escrito una clase para lograr lo que creo que necesitas (de una manera relativamente limpia, diría yo). Déjame saber lo que piensas.

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Text; 

public class AsyncStreamProcessor : IDisposable 
{ 
    protected StringBuilder _buffer; // Buffer for unprocessed data. 

    private bool _isDisposed = false; // True if object has been disposed 

    public AsyncStreamProcessor() 
    { 
     _buffer = null; 
    } 

    public IEnumerable<string> Process(byte[] newData) 
    { 
     // Note: replace the following encoding method with whatever you are reading. 
     // The trick here is to add an extra line break to the new data so that the algorithm recognises 
     // a single line break at the end of the new data. 
     using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine)) 
     { 
      // Read all lines from new data, returning all but the last. 
      // The last line is guaranteed to be incomplete (or possibly complete except for the line break, 
      // which will be processed with the next packet of data). 
      string line, prevLine = null; 
      while ((line = newDataReader.ReadLine()) != null) 
      { 
       if (prevLine != null) 
       { 
        yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine; 
        _buffer = null; 
       } 
       prevLine = line; 
      } 

      // Store last incomplete line in buffer. 
      if (_buffer == null) 
       // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
       // so that the buffer does not have to be expanded in most/all situations. 
       // Change it to whatever seems appropiate. 
       _buffer = new StringBuilder(prevLine, prevLine.Length * 2); 
      else 
       _buffer.Append(prevLine); 
     } 
    } 

    public void Dispose() 
    { 
     Dispose(true); 
     GC.SuppressFinalize(this); 
    } 

    private void Dispose(bool disposing) 
    { 
     if (!_isDisposed) 
     { 
      if (disposing) 
      { 
       // Dispose managed resources. 
       _buffer = null; 
       GC.Collect(); 
      } 

      // Dispose native resources. 

      // Remember that object has been disposed. 
      _isDisposed = true; 
     } 
    } 
} 

Una instancia de esta clase debe ser creado para cada NetworkStream y la función del proceso debería ser llamado cada vez que se reciben nuevos datos (en el método de devolución de llamada para BeginRead, antes de llamar a la siguiente BeginRead me imagino).

Nota: Solo he verificado este código con datos de prueba, no datos reales transmitidos a través de la red. Sin embargo, no anticiparía ninguna diferencia ...

También, una advertencia de que la clase, por supuesto, no es segura para subprocesos, pero mientras BeginRead no se ejecute de nuevo hasta después de que se hayan procesado los datos actuales (como supongo que estás haciendo), no debería haber ningún problema.

Espero que esto funcione para usted. Avíseme si quedan problemas pendientes e intentaré modificar la solución para resolverlos. (Esto muy probablemente podría haber alguna sutileza de la pregunta que me echaba de menos, a pesar de leer con cuidado!)

+0

Esta es una solución interesante. Yo también he encontrado que los iteradores son útiles, pero esta no fue una solución que mi mente hubiera ideado. Me gusta. –

+1

¿Puedes explicar por qué necesitas implementar IDispose? Me han dicho que llamar a GC.Collect() es una mala práctica y puede dar como resultado un bajo rendimiento. ¿Le preocupan las asignaciones rápidas en un corto período de tiempo agotando el montón? –

+0

Sí, los iteradores son útiles. En este caso, podrías hacerlo con una lista genérica, aunque puede que no se vea tan bien por supuesto. Si usted quiere tratar con el resultado como una Lista/Array, es trivial para convertir a esos tipos de todos modos, y la implementación es aún más simple. – Noldorin

0

Lo que estás explicando en tu pregunta, me recuerda mucho a las cadenas ASCIZ. (link text). Eso puede ser un comienzo útil.

Tuve que escribir algo similar a esto en la universidad para un proyecto en el que estaba trabajando. Desgraciadamente, tenía control sobre el socket de envío, así que inserté una longitud de campo de mensaje como parte del protocolo. Sin embargo, creo que un enfoque similar puede beneficiarlo.

Como me acerqué a mi solución fue que enviaría algo como 5HELLO, así que primero vería 5, y sabría que tenía una longitud de mensaje de 5, y por lo tanto el mensaje que necesitaba eran 5 caracteres. Sin embargo, si en mi lectura asíncrona, solo obtuve 5HE, vería que tengo longitud de mensaje 5, pero solo pude leer 3 bytes del cable (supongamos caracteres ASCII). Debido a esto, sabía que me faltaban algunos bytes, y almacenaba lo que tenía en el buffer de fragmentos. Tenía un buffer de fragmento por socket, por lo tanto, evité cualquier problema de sincronización. El proceso aproximado es

  1. leer del socket en una matriz de bytes, ficha cuántos bytes se leyó
  2. escanear a través de byte a byte, hasta que encuentre un carácter de nueva línea (esto se vuelve muy compleja si no está recibiendo caracteres ASCII, pero caracteres que podrían ser múltiples bytes, está en su propia cuenta)
  3. Convierta su buffer de fragmentación en una cadena, y anexe su buffer de lectura hasta la nueva línea. Coloque esta cadena como un mensaje completo en una cola o su propio delegado para ser procesado. (Puede optimizar estos búferes teniendo su escritura de socket de lectura en la misma matriz de bytes mientras está fragmentado, pero eso es más difícil de explicar)
  4. Continúe repitiendo, cada vez que encontremos una nueva línea, cree una cadena desde el byte organizar desde una posición de inicio/final grabada y soltar en la cola/delegar para el procesamiento.
  5. Una vez que llegamos al final de nuestro buffer de lectura, copia todo lo que quede en el buffer de frag.
  6. Llame a BeginRead en el zócalo, que saltará al paso 1. cuando haya datos disponibles en el zócalo.

Luego utiliza otro subproceso para leer que está en cola de mensajes de incommign, o simplemente deje que el Threadpool lo maneje utilizando delegados. Y haz el procesamiento de datos que tengas que hacer. Alguien me va a corregir si estoy equivocado, pero hay muy pocos problemas de sincronización de hilos con esto, ya que solo puedes leer o esperar a leer desde el socket en cualquier momento, así que no te preocupes por los bloqueos (excepto si eres poblando una cola, utilicé delegados en mi implementación). Hay algunos detalles que deberá resolver por su cuenta, como qué tan grande de un buffer de fragmentación dejar, si recibe 0 nuevas líneas cuando lee, todo el mensaje debe ser anexado al buffer de fragmentos sin sobreescribir cualquier cosa. Creo que al final me dieron aproximadamente entre 700 y 800 líneas de código, pero eso incluía la configuración de la conexión, la negociación para el cifrado y algunas otras cosas más.

Esta configuración funcionó muy bien para mí; Pude realizar hasta 80Mbps en una LAN ethernet de 100Mbps usando esta implementación con un opteron de 1.8Ghz, incluido el procesamiento de encriptación. Y como está vinculado al socket, el servidor escalará ya que se pueden trabajar múltiples sockets al mismo tiempo. Si necesita artículos procesados ​​en orden, necesitará usar una cola, pero si el pedido no importa, los delegados le darán un rendimiento muy escalable fuera del grupo de temas.

Espero que esto ayude, no pretende ser una solución completa, sino una dirección para comenzar a buscar.

* Solo una nota, mi implementación se redujo simplemente en el nivel de bytes y el cifrado admitido, utilicé caracteres para mi ejemplo para que sea más fácil de visualizar.

+0

Sí, he implementado un enfoque similar a este, pero no me gusta. Es demasiado complicado y complejo para mi gusto, por eso estoy pidiendo sugerencias aquí. Me gusta el enfoque de Noldorin, tiene la elgancia y la reutilización del código de marco existente que deseo. –

Cuestiones relacionadas