2012-04-21 6 views
7

Necesito implementar un mecanismo de aceleración (solicitudes por segundo) cuando uso HttpWebRequest para realizar solicitudes paralelas hacia un servidor de aplicaciones. Mi aplicación C# debe emitir no más de 80 solicitudes por segundo a un servidor remoto. El límite lo imponen los administradores del servicio remoto no como un límite estricto sino como "SLA" entre mi plataforma y la de ellos.¿Cómo limitar el número de HttpWebRequest por segundo hacia un servidor web?

¿Cómo puedo controlar el número de solicitudes por segundo al usar HttpWebRequest?

Respuesta

3

Tuve el mismo problema y no pude encontrar una solución preparada, así que hice una, y aquí está. La idea es usar BlockingCollection<T> para agregar elementos que necesitan procesamiento y usar Reactive Extensions para suscribirse con un procesador de velocidad limitada.

clase Throttle es la versión renombrada de this rate limiter

public static class BlockingCollectionExtensions 
{ 
    // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed) 
    public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken) 
    { 
     Subject<T> subject = new Subject<T>(); 

     // this is a dummyToken just so we can recreate the TokenSource 
     // which we will pass the proxy class so it can cancel the task 
     // on disposal 
     CancellationToken dummyToken = new CancellationToken(); 
     CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken); 

     var consumingTask = new Task(() => 
     { 
      using (var throttle = new Throttle(items, timePeriod)) 
      { 
       while (!sequence.IsCompleted) 
       { 
        try 
        { 
         T item = sequence.Take(producerToken); 
         throttle.WaitToProceed(); 
         try 
         { 
          subject.OnNext(item); 
         } 
         catch (Exception ex) 
         { 
          subject.OnError(ex); 
         } 
        } 
        catch (OperationCanceledException) 
        { 
         break; 
        } 
       } 
       subject.OnCompleted(); 
      } 
     }, TaskCreationOptions.LongRunning); 

     return new TaskAwareObservable<T>(subject, consumingTask, tokenSource); 
    } 

    private class TaskAwareObservable<T> : IObservable<T>, IDisposable 
    { 
     private readonly Task task; 
     private readonly Subject<T> subject; 
     private readonly CancellationTokenSource taskCancellationTokenSource; 

     public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource) 
     { 
      this.task = task; 
      this.subject = subject; 
      this.taskCancellationTokenSource = tokenSource; 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      var disposable = subject.Subscribe(observer); 
      if (task.Status == TaskStatus.Created) 
       task.Start(); 
      return disposable; 
     } 

     public void Dispose() 
     { 
      // cancel consumption and wait task to finish 
      taskCancellationTokenSource.Cancel(); 
      task.Wait(); 

      // dispose tokenSource and task 
      taskCancellationTokenSource.Dispose(); 
      task.Dispose(); 

      // dispose subject 
      subject.Dispose(); 
     } 
    } 
} 

Unidad de prueba:

class BlockCollectionExtensionsTest 
{ 
    [Fact] 
    public void AsRateLimitedObservable() 
    { 
     const int maxItems = 1; // fix this to 1 to ease testing 
     TimeSpan during = TimeSpan.FromSeconds(1); 

     // populate collection 
     int[] items = new[] { 1, 2, 3, 4 }; 
     BlockingCollection<int> collection = new BlockingCollection<int>(); 
     foreach (var i in items) collection.Add(i); 
     collection.CompleteAdding(); 

     IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None); 
     BlockingCollection<int> processedItems = new BlockingCollection<int>(); 
     ManualResetEvent completed = new ManualResetEvent(false); 
     DateTime last = DateTime.UtcNow; 
     observable 
      // this is so we'll receive exceptions 
      .ObserveOn(new SynchronizationContext()) 
      .Subscribe(item => 
       { 
        if (item == 1) 
         last = DateTime.UtcNow; 
        else 
        { 
         TimeSpan diff = (DateTime.UtcNow - last); 
         last = DateTime.UtcNow; 

         Assert.InRange(diff.TotalMilliseconds, 
          during.TotalMilliseconds - 30, 
          during.TotalMilliseconds + 30); 
        } 
        processedItems.Add(item); 
       }, 
       () => completed.Set() 
      ); 
     completed.WaitOne(); 
     Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>()); 
    } 
} 
+0

algo malo le pasó a la URL –

-1

Mi publicación original discutía cómo agregar un mecanismo de aceleración a WCF a través de las extensiones de comportamiento del cliente, pero luego se señaló que leí mal la pregunta (¡doh!).

En general, el enfoque puede ser verificar con una clase que determina si estamos violando el límite de velocidad o no. Ya ha habido mucha discusión sobre cómo verificar violaciones de tarifas.

Throttling method calls to M requests in N seconds

Si usted está violando el límite de velocidad, y luego dormir por un intervalo fijo y comprobar de nuevo. De lo contrario, continúe y realice la llamada HttpWebRequest.

+0

En la pregunta, no me refiero a un servicio web WCF. Se trata de un uso simple de la clase HttpWebRequest. –

+0

Ah, es tarde y debería haber leído la pregunta más de cerca :) Todavía puede intentar el enfoque de antes de hacer una llamada a HttpWebRequest, consulte con otra clase para asegurarse de que no violará la tasa de 80 solicitudes/seg. Actualizaré mi código de arriba. –

+0

Pidió C# no Java. – SmallChess

0

Los métodos de extensión Throttle() y Sample() (On Observable) le permiten regular una secuencia rápida de eventos en una secuencia "más lenta".

Here is a blog post with an example de Sample(Timespan) que garantiza una tasa máxima.

+0

El problema con Sample() y Throttle() es que saltan/tiran muestras para alcanzar la velocidad especificada. – georgiosd

Cuestiones relacionadas