2008-12-16 14 views
17

Estoy desarrollando un programa que envía continuamente una secuencia de datos en segundo plano y quiero permitir que el usuario establezca un límite para los límites de carga y descarga.Disminución del ancho de banda en C#

He leído en los token bucket y leaky bucket alghorhithms, y aparentemente este último parece ajustarse a la descripción ya que esto no es una cuestión de maximizar el ancho de banda de red, sino más bien ser lo más discreto posible.

Sin embargo, estoy un poco inseguro de cómo implementaría esto. Un enfoque natural es extender la clase Stream abstracta para que sea más simple extender el tráfico existente, pero ¿no requeriría la participación de hilos adicionales para enviar los datos mientras se recibe simultáneamente (cubo con fugas)? Cualquier sugerencia sobre otras implementaciones que hagan lo mismo sería apreciada.

Además, aunque puedo modificar la cantidad de datos que recibe el programa, ¿qué tan bien funciona el estrangulamiento del ancho de banda en el nivel C#? ¿La computadora aún recibirá los datos y simplemente los guardará, cancelando efectivamente el efecto de aceleración o esperará hasta que solicite recibir más?

EDITAR: Estoy interesado en la regulación de los datos entrantes y salientes, donde no tengo control sobre el extremo opuesto de la secuencia.

Respuesta

1

Se me ocurrió una implementación diferente de la clase ThrottledStream mencionada por arul. Mi versión utiliza un WaitHandle y un temporizador con un intervalo de 1 s:

public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
{ 
    MaxBytesPerSecond = maxBytesPerSecond; 
    parent = parentStream; 
    processed = 0; 
    resettimer = new System.Timers.Timer(); 
    resettimer.Interval = 1000; 
    resettimer.Elapsed += resettimer_Elapsed; 
    resettimer.Start();   
} 

protected void Throttle(int bytes) 
{ 
    try 
    { 
     processed += bytes; 
     if (processed >= maxBytesPerSecond) 
      wh.WaitOne(); 
    } 
    catch 
    { 
    } 
} 

private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
{ 
    processed = 0; 
    wh.Set(); 
} 

Siempre que el ancho de banda límite se excede el hilo dormir hasta que comience el próximo segundo. No es necesario calcular la duración óptima del sueño.

plena aplicación:

public class ThrottledStream : Stream 
{ 
    #region Properties 

    private int maxBytesPerSecond; 
    /// <summary> 
    /// Number of Bytes that are allowed per second 
    /// </summary> 
    public int MaxBytesPerSecond 
    { 
     get { return maxBytesPerSecond; } 
     set 
     { 
      if (value < 1) 
       throw new ArgumentException("MaxBytesPerSecond has to be >0"); 

      maxBytesPerSecond = value; 
     } 
    } 

    #endregion 


    #region Private Members 

    private int processed; 
    System.Timers.Timer resettimer; 
    AutoResetEvent wh = new AutoResetEvent(true); 
    private Stream parent; 

    #endregion 

    /// <summary> 
    /// Creates a new Stream with Databandwith cap 
    /// </summary> 
    /// <param name="parentStream"></param> 
    /// <param name="maxBytesPerSecond"></param> 
    public ThrottledStream(Stream parentStream, int maxBytesPerSecond=int.MaxValue) 
    { 
     MaxBytesPerSecond = maxBytesPerSecond; 
     parent = parentStream; 
     processed = 0; 
     resettimer = new System.Timers.Timer(); 
     resettimer.Interval = 1000; 
     resettimer.Elapsed += resettimer_Elapsed; 
     resettimer.Start();   
    } 

    protected void Throttle(int bytes) 
    { 
     try 
     { 
      processed += bytes; 
      if (processed >= maxBytesPerSecond) 
       wh.WaitOne(); 
     } 
     catch 
     { 
     } 
    } 

    private void resettimer_Elapsed(object sender, ElapsedEventArgs e) 
    { 
     processed = 0; 
     wh.Set(); 
    } 

    #region Stream-Overrides 

    public override void Close() 
    { 
     resettimer.Stop(); 
     resettimer.Close(); 
     base.Close(); 
    } 
    protected override void Dispose(bool disposing) 
    { 
     resettimer.Dispose(); 
     base.Dispose(disposing); 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     return parent.Read(buffer, offset, count); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 

    #endregion 


} 
+0

Será más preciso si no configura 'processed' en' 0' cuando el temporizador marca, pero reste 'maxBytesPerSecond' de él. –

1

Basado en @ solución de 0xLOQUESEA he creado la siguiente solución (comprobable) basado en Rx programadores:

public class ThrottledStream : Stream 
{ 
    private readonly Stream parent; 
    private readonly int maxBytesPerSecond; 
    private readonly IScheduler scheduler; 
    private readonly IStopwatch stopwatch; 

    private long processed; 

    public ThrottledStream(Stream parent, int maxBytesPerSecond, IScheduler scheduler) 
    { 
     this.maxBytesPerSecond = maxBytesPerSecond; 
     this.parent = parent; 
     this.scheduler = scheduler; 
     stopwatch = scheduler.StartStopwatch(); 
     processed = 0; 
    } 

    public ThrottledStream(Stream parent, int maxBytesPerSecond) 
     : this (parent, maxBytesPerSecond, Scheduler.Immediate) 
    { 
    } 

    protected void Throttle(int bytes) 
    { 
     processed += bytes; 
     var targetTime = TimeSpan.FromSeconds((double)processed/maxBytesPerSecond); 
     var actualTime = stopwatch.Elapsed; 
     var sleep = targetTime - actualTime; 
     if (sleep > TimeSpan.Zero) 
     { 
      using (var waitHandle = new AutoResetEvent(initialState: false)) 
      { 
       scheduler.Sleep(sleep).GetAwaiter().OnCompleted(() => waitHandle.Set()); 
       waitHandle.WaitOne(); 
      } 
     } 
    } 

    public override bool CanRead 
    { 
     get { return parent.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return parent.CanSeek; } 
    } 

    public override bool CanWrite 
    { 
     get { return parent.CanWrite; } 
    } 

    public override void Flush() 
    { 
     parent.Flush(); 
    } 

    public override long Length 
    { 
     get { return parent.Length; } 
    } 

    public override long Position 
    { 
     get 
     { 
      return parent.Position; 
     } 
     set 
     { 
      parent.Position = value; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     var read = parent.Read(buffer, offset, count); 
     Throttle(read); 
     return read; 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     return parent.Seek(offset, origin); 
    } 

    public override void SetLength(long value) 
    { 
     parent.SetLength(value); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     Throttle(count); 
     parent.Write(buffer, offset, count); 
    } 
} 

y algunas pruebas de que sólo ocupan algunos milisegundos:

[TestMethod] 
public void ShouldThrottleReading() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new ThrottledStream(new MemoryStream(content), content.Length/8, scheduler); 
    var target = new MemoryStream(); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 

[TestMethod] 
public void ShouldThrottleWriting() 
{ 
    var content = Enumerable 
     .Range(0, 1024 * 1024) 
     .Select(_ => (byte)'a') 
     .ToArray(); 
    var scheduler = new TestScheduler(); 
    var source = new MemoryStream(content); 
    var target = new ThrottledStream(new MemoryStream(), content.Length/8, scheduler); 

    var t = source.CopyToAsync(target); 

    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(4).Ticks); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks - 1); 
    t.Wait(10).Should().BeFalse(); 
    scheduler.AdvanceTo(TimeSpan.FromSeconds(8).Ticks); 
    t.Wait(10).Should().BeTrue(); 
} 
Cuestiones relacionadas