2011-11-22 8 views
13

¿Hay alguna estructura de datos .NET/combinación de clases que permita agregar datos de bytes al final de un búfer pero todas las miradas y lecturas son desde el principio, acortando el búfer cuando ¿Yo leo?búfer FIFO/Queue especializado en bytes de bytes

La clase MemoryStream parece hacer parte de esto, pero necesito mantener ubicaciones separadas para leer y escribir, y no descarta automáticamente los datos al principio después de su lectura.

Una respuesta ha sido publicada en respuesta a this question que es básicamente lo que estoy tratando de hacer, pero prefiero algo que pueda hacer E/S asincrónica en diferentes componentes del mismo proceso, al igual que una tubería normal o incluso una transmisión de red (primero necesito filtrar/procesar los datos).

+1

¿Hay algún problema al saltar hacia adelante y hacia atrás dentro de un búfer de lectura? – Ryan

+0

Solo lo que dije y tener que hacer un seguimiento de eso en comparación con el estilo de lectura, lectura, lectura, etc. de NetworkStream – Deanna

+0

¿Necesita leer y escribir matrices de diferentes tamaños? ¿No sería suficiente una cola de 'byte []' para ti? – svick

Respuesta

10

Voy a publicar una copia eliminada de alguna lógica que escribí para un proyecto en el trabajo una vez. La ventaja de esta versión es que funciona con una lista enlazada de datos almacenados en búfer y, por lo tanto, no es necesario guardar en memoria caché grandes cantidades de memoria y/o copiar memoria cuando se lee. además, su hilo es seguro y se comporta como un flujo de red, es decir: cuando lee cuando no hay datos disponibles: espere hasta que haya datos disponibles o tiempo de espera excedido. Además, al leer x cantidades de bytes y solo hay cantidades de bytes, devuelve después de leer todos los bytes. ¡Espero que esto ayude!

public class SlidingStream : Stream 
{ 
    #region Other stream member implementations 

    ... 

    #endregion Other stream member implementations 

    public SlidingStream() 
    { 
     ReadTimeout = -1; 
    } 

    private readonly object _writeSyncRoot = new object(); 
    private readonly object _readSyncRoot = new object(); 
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>(); 
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim(); 

    public int ReadTimeout { get; set; } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     if (_dataAvailableResetEvent.Wait(ReadTimeout)) 
      throw new TimeoutException("No data available"); 

     lock (_readSyncRoot) 
     { 
      int currentCount = 0; 
      int currentOffset = 0; 

      while (currentCount != count) 
      { 
       ArraySegment<byte> segment = _pendingSegments.First.Value; 
       _pendingSegments.RemoveFirst(); 

       int index = segment.Offset; 
       for (; index < segment.Count; index++) 
       { 
        if (currentOffset < offset) 
        { 
         currentOffset++; 
        } 
        else 
        { 
         buffer[currentCount] = segment.Array[index]; 
         currentCount++; 
        } 
       } 

       if (currentCount == count) 
       { 
        if (index < segment.Offset + segment.Count) 
        { 
         _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index)); 
        } 
       } 

       if (_pendingSegments.Count == 0) 
       { 
        _dataAvailableResetEvent.Reset(); 

        return currentCount; 
       } 
      } 

      return currentCount; 
     } 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     lock (_writeSyncRoot) 
     { 
      byte[] copy = new byte[count]; 
      Array.Copy(buffer, offset, copy, 0, count); 

      _pendingSegments.AddLast(new ArraySegment<byte>(copy)); 

      _dataAvailableResetEvent.Set(); 
     } 
    } 
} 
+1

Se ve bien, y era la forma en que me dirigía. Lo intentaré esta noche. – Deanna

+0

Me parece que esto se bloqueará si intenta leer los datos cuando no hay ninguno disponible. – svick

+0

@svick - Absolutamente correcto, es solo un borrador, no hay validación de argumento, etc. El manualResetEvent está ahí por esa única razón, me olvidé de esperar al principio del método de lectura. Corregido ahora. Gracias por el aviso – Polity

1

El código puede ser más simple que en la respuesta aceptada. No es necesario utilizar un bucle for .:

/// <summary> 
/// This class is a very fast and threadsafe FIFO buffer 
/// </summary> 
public class FastFifo 
{ 
    private List<Byte> mi_FifoData = new List<Byte>(); 

    /// <summary> 
    /// Get the count of bytes in the Fifo buffer 
    /// </summary> 
    public int Count 
    { 
     get 
     { 
      lock (mi_FifoData) 
      { 
       return mi_FifoData.Count; 
      } 
     } 
    } 

    /// <summary> 
    /// Clears the Fifo buffer 
    /// </summary> 
    public void Clear() 
    { 
     lock (mi_FifoData) 
     { 
      mi_FifoData.Clear(); 
     } 
    } 

    /// <summary> 
    /// Append data to the end of the fifo 
    /// </summary> 
    public void Push(Byte[] u8_Data) 
    { 
     lock (mi_FifoData) 
     { 
      // Internally the .NET framework uses Array.Copy() which is extremely fast 
      mi_FifoData.AddRange(u8_Data); 
     } 
    } 

    /// <summary> 
    /// Get data from the beginning of the fifo. 
    /// returns null if s32_Count bytes are not yet available. 
    /// </summary> 
    public Byte[] Pop(int s32_Count) 
    { 
     lock (mi_FifoData) 
     { 
      if (mi_FifoData.Count < s32_Count) 
       return null; 

      // Internally the .NET framework uses Array.Copy() which is extremely fast 
      Byte[] u8_PopData = new Byte[s32_Count]; 
      mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count); 
      mi_FifoData.RemoveRange(0, s32_Count); 
      return u8_PopData; 
     } 
    } 

    /// <summary> 
    /// Gets a byte without removing it from the Fifo buffer 
    /// returns -1 if the index is invalid 
    /// </summary> 
    public int PeekAt(int s32_Index) 
    { 
     lock (mi_FifoData) 
     { 
      if (s32_Index < 0 || s32_Index >= mi_FifoData.Count) 
       return -1; 

      return mi_FifoData[s32_Index]; 
     } 
    } 
} 
+0

Eso es esencialmente lo mismo que la pregunta vinculada, que no cumple con los deseos asincrónicos o de bloqueo. Gracias sin embargo. – Deanna

+0

Bien, pero ese código no es tan elegante y no es seguro para subprocesos. Puedes hacerlo con 6 líneas en lugar de necesitar 16 líneas. – Elmue