2009-09-25 15 views
8

estoy usando una biblioteca que requiere que proporcione un objeto que implementa esta interfaz:¿Hay una en secuencia de memoria que bloquea como una secuencia de archivo

public interface IConsole { 
    TextWriter StandardInput { get; } 
    TextReader StandardOutput { get; } 
    TextReader StandardError { get; } 
} 

lectores del objeto luego son utilizados por la biblioteca con:

IConsole console = new MyConsole(); 
int readBytes = console.StandardOutput.Read(buffer, 0, buffer.Length); 

Normalmente, la clase que implementa IConsole tiene la secuencia StandardOutput como procedente de un proceso externo. En ese caso, las llamadas console.StandardOutput.Read funcionan bloqueando hasta que haya algunos datos escritos en la transmisión StandardOutput.

Lo que intento hacer es crear una implementación de IConsole de prueba que use MemoryStreams y echo lo que aparezca en la entrada StandardInput en la entrada StandardInput. Probé:

MemoryStream echoOutStream = new MemoryStream(); 
StandardOutput = new StreamReader(echoOutStream); 

Pero el problema con esto es la console.StandardOutput.Read retornará 0 en lugar de bloques hasta que hay algunos datos. ¿Hay alguna forma de que pueda bloquear un MemoryStream si no hay datos disponibles o hay un flujo de memoria diferente que pueda usar?

+1

Realmente no debería leer de una secuencia de salida. –

Respuesta

7

Al final encontré una manera fácil de hacerlo al heredar de MemoryStream y asumir los métodos de lectura y escritura.

public class EchoStream : MemoryStream { 

    private ManualResetEvent m_dataReady = new ManualResetEvent(false); 
    private byte[] m_buffer; 
    private int m_offset; 
    private int m_count; 

    public override void Write(byte[] buffer, int offset, int count) { 
     m_buffer = buffer; 
     m_offset = offset; 
     m_count = count; 
     m_dataReady.Set(); 
    } 

    public override int Read(byte[] buffer, int offset, int count) { 
     if (m_buffer == null) { 
      // Block until the stream has some more data. 
      m_dataReady.Reset(); 
      m_dataReady.WaitOne();  
     } 

     Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, (count < m_count) ? count : m_count); 
     m_buffer = null; 
     return (count < m_count) ? count : m_count; 
    } 
} 
+1

Tiene una condición de carrera en 'Read()'. Si 'Write()' es llamado por otro hilo entre la comprobación nula del buffer y 'm_dataReady.Reset()', es posible que tenga que esperar para siempre si el servidor no va a enviar datos nuevamente. En la mayoría de los protocolos de solicitud/respuesta, esto crearía un bloqueo muerto. Le sugiero que use un evento automático en su lugar. –

+0

Bastante justo. Estoy de acuerdo en que vale la pena hacer un check in. Huelga decir que he tenido el código anterior en producción en un servicio público de SSH durante 5 años y nunca he suspendido el servicio, así que sospecho que es una condición de muy baja probabilidad. – sipwiz

+0

@sipwiz esta es una buena respuesta. Sin embargo, el uso de Array.copy no permite que la lectura funcione correctamente. No admitirá copias compensadas. Necesita cambiar a ** Buffer.BlockCopy (m_buffer, 0, buffer, offset, m_count); ** esto también es más rápido en la mayoría de los sistemas. –

8

Inspirado por su respuesta, aquí está mi multi-hilo, versión multi-escritura:

public class EchoStream : MemoryStream 
{ 
    private readonly ManualResetEvent _DataReady = new ManualResetEvent(false); 
    private readonly ConcurrentQueue<byte[]> _Buffers = new ConcurrentQueue<byte[]>(); 

    public bool DataAvailable{get { return !_Buffers.IsEmpty; }} 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     _Buffers.Enqueue(buffer); 
     _DataReady.Set(); 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     _DataReady.WaitOne(); 

     byte[] lBuffer; 

     if (!_Buffers.TryDequeue(out lBuffer)) 
     { 
      _DataReady.Reset(); 
      return -1; 
     } 

     if (!DataAvailable) 
      _DataReady.Reset(); 

     Array.Copy(lBuffer, buffer, lBuffer.Length); 
     return lBuffer.Length; 
    } 
} 

Con la versión que debe leer la secuencia en Write, sin ningún tipo de escritura consecutivamente ser posible. Mi versión almacena temporalmente cualquier búfer escrito en una ConcurrentQueue (es bastante sencillo cambiarlo a una cola simple y bloquearlo)

+0

esto es increíble, sin embargo, hay un error en el método Write, '_Buffers.Enqueue (buffer);' debe ser reemplazado por '_Buffers.Enqueue (buffer.Take (count) .ToArray());' y luego está realmente funcionando, bloqueando e intercambiando datos entre hilos! ¡Gracias! –

Cuestiones relacionadas