2010-11-24 12 views
13

Estoy evaluando Rx para un proyecto de plataforma comercial que necesitará procesar miles de mensajes por segundo. La plataforma existente tiene un sistema complejo de enrutamiento de eventos (delegados de multidifusión) que responde a estos mensajes y realiza una gran cantidad de procesamiento posterior.Las extensiones reactivas parecen muy lentas. ¿Estoy haciendo algo mal?

He examinado las extensiones reactivas para ver los beneficios obvios, pero noté que es algo más lento, normalmente 100 veces más lento.

He creado una prueba unitaria para demostrar esto que ejecuta un simple incremento de 1 millón de veces, usando varios sabores Rx y una prueba de "control" de delegado directo.

Éstos son los resultados:

Delegate         - (1000000) - 00:00:00.0410000 
Observable.Range()      - (1000000) - 00:00:04.8760000 
Subject.Subscribe() - NewThread   - (1000000) - 00:00:02.7630000 
Subject.Subscribe() - CurrentThread  - (1000000) - 00:00:03.0280000 
Subject.Subscribe() - Immediate   - (1000000) - 00:00:03.0030000 
Subject.Subscribe() - ThreadPool   - (1000000) - 00:00:02.9800000 
Subject.Subscribe() - Dispatcher   - (1000000) - 00:00:03.0360000 

Como se puede ver, todos los métodos son Rx ~ 100 veces más lento que un delegado equivalente. Obviamente, Rx está haciendo muchas cosas bajo las coberturas que serán útiles en un ejemplo más complejo, pero esto parece increíblemente lento.

¿Es esto normal o mis hipótesis de prueba son inválidas? código Nunit de lo anterior abajo -

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Linq; 
using System.Text; 
using NUnit.Framework; 
using System.Concurrency; 

namespace RxTests 
{ 
    [TestFixture] 
    class ReactiveExtensionsBenchmark_Tests 
    { 
     private int counter = 0; 

     [Test] 
     public void ReactiveExtensionsPerformanceComparisons() 
     { 
      int iterations = 1000000; 

      Action<int> a = (i) => { counter++; }; 

      DelegateSmokeTest(iterations, a); 
      ObservableRangeTest(iterations, a); 
      SubjectSubscribeTest(iterations, a, Scheduler.NewThread, "NewThread"); 
      SubjectSubscribeTest(iterations, a, Scheduler.CurrentThread, "CurrentThread"); 
      SubjectSubscribeTest(iterations, a, Scheduler.Immediate, "Immediate"); 
      SubjectSubscribeTest(iterations, a, Scheduler.ThreadPool, "ThreadPool"); 
      SubjectSubscribeTest(iterations, a, Scheduler.Dispatcher, "Dispatcher"); 
     } 

     public void ObservableRangeTest(int iterations, Action<int> action) 
     { 
      counter = 0; 

      long start = DateTime.Now.Ticks; 

      Observable.Range(0, iterations).Subscribe(action); 

      OutputTestDuration("Observable.Range()", start); 
     } 


     public void SubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
     { 
      counter = 0; 

      var eventSubject = new Subject<int>(); 
      var events = eventSubject.SubscribeOn(scheduler); //edited - thanks dtb 
      events.Subscribe(action); 

      long start = DateTime.Now.Ticks; 

      Enumerable.Range(0, iterations).ToList().ForEach 
       (
        a => eventSubject.OnNext(1) 
       ); 

      OutputTestDuration("Subject.Subscribe() - " + mode, start); 
     } 

     public void DelegateSmokeTest(int iterations, Action<int> action) 
     { 
      counter = 0; 
      long start = DateTime.Now.Ticks; 

      Enumerable.Range(0, iterations).ToList().ForEach 
       (
        a => action(1) 
       ); 

      OutputTestDuration("Delegate", start); 
     } 


     /// <summary> 
     /// Output helper 
     /// </summary> 
     /// <param name="test"></param> 
     /// <param name="duration"></param> 
     public void OutputTestDuration(string test, long duration) 
     { 
      Debug.WriteLine(string.Format("{0, -40} - ({1}) - {2}", test, counter, ElapsedDuration(duration))); 
     } 

     /// <summary> 
     /// Test timing helper 
     /// </summary> 
     /// <param name="elapsedTicks"></param> 
     /// <returns></returns> 
     public string ElapsedDuration(long elapsedTicks) 
     { 
      return new TimeSpan(DateTime.Now.Ticks - elapsedTicks).ToString(); 
     } 

    } 
} 
+0

SubjectSubscribeTest no usa el argumento 'scheduler', así que me sorprende que en realidad obtengas resultados diferentes. – dtb

+0

¿Y por qué estás usando un tema en lugar de suscribir la acción directamente al observable? Los sujetos de Afaik hacen bastante bajo la cobertura, por lo que debes comprobar si eliminarlos hace la diferencia. – dtb

+0

Me sorprende que el ObservableRangeTest tenga un rendimiento * realmente * malo, incluso en comparación con las pruebas con el tema. Wtf? – dtb

Respuesta

16

Mi conjetura es que el equipo de Rx se centra en la construcción de la primera funcionalidad y no se preocupa por la optimización del rendimiento todavía.

Utilice un generador de perfiles para determinar los cuellos de botella y reemplace las clases lentas de Rx con sus propias versiones optimizadas.

A continuación se muestran dos ejemplos.

Resultados:

 
Delegate         - (1000000) - 00:00:00.0368748 

Simple - NewThread      - (1000000) - 00:00:00.0207676 
Simple - CurrentThread     - (1000000) - 00:00:00.0214599 
Simple - Immediate      - (1000000) - 00:00:00.0162026 
Simple - ThreadPool      - (1000000) - 00:00:00.0169848 

FastSubject.Subscribe() - NewThread  - (1000000) - 00:00:00.0588149 
FastSubject.Subscribe() - CurrentThread - (1000000) - 00:00:00.0508842 
FastSubject.Subscribe() - Immediate  - (1000000) - 00:00:00.0513911 
FastSubject.Subscribe() - ThreadPool  - (1000000) - 00:00:00.0529137 

En primer lugar, parece importar mucho cómo lo observable se implementa. Aquí es un observable que no pueden anular su inscripción, pero es rápido:

private IObservable<int> CreateFastObservable(int iterations) 
{ 
    return Observable.Create<int>(observer => 
    { 
     new Thread(_ => 
     { 
      for (int i = 0; i < iterations; i++) 
      { 
       observer.OnNext(i); 
      } 
      observer.OnCompleted(); 
     }).Start(); 
     return() => { }; 
    }); 
} 

prueba:

public void SimpleObserveTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
{ 
    counter = 0; 

    var start = Stopwatch.StartNew(); 

    var observable = CreateFastObservable(iterations); 

    observable.SubscribeOn(scheduler).Run(action); 

    OutputTestDuration("Simple - " + mode, start); 
} 

Los sujetos añadir un montón de gastos generales. Aquí hay un tema que es despojado de gran parte de la funcionalidad esperada de un sujeto, pero es rápido:

class FastSubject<T> : ISubject<T> 
{ 
    private event Action onCompleted; 
    private event Action<Exception> onError; 
    private event Action<T> onNext; 

    public FastSubject() 
    { 
     onCompleted +=() => { }; 
     onError += error => { }; 
     onNext += value => { }; 
    } 

    public void OnCompleted() 
    { 
     this.onCompleted(); 
    } 

    public void OnError(Exception error) 
    { 
     this.onError(error); 
    } 

    public void OnNext(T value) 
    { 
     this.onNext(value); 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     this.onCompleted += observer.OnCompleted; 
     this.onError += observer.OnError; 
     this.onNext += observer.OnNext; 

     return Disposable.Create(() => 
     { 
      this.onCompleted -= observer.OnCompleted; 
      this.onError -= observer.OnError; 
      this.onNext -= observer.OnNext; 
     }); 
    } 
} 

prueba:

public void FastSubjectSubscribeTest(int iterations, Action<int> action, IScheduler scheduler, string mode) 
{ 
    counter = 0; 

    var start = Stopwatch.StartNew(); 

    var observable = new ConnectableObservable<int>(CreateFastObservable(iterations), new FastSubject<int>()).RefCount(); 

    observable.SubscribeOn(scheduler).Run(action); 

    OutputTestDuration("FastSubject.Subscribe() - " + mode, start); 
} 
+0

Wow. Muchas gracias por tu tiempo en esto. –

+2

Observable.Range tiene una sobrecarga con un Programador como parámetro, entonces, esto retorna (usando Scheduler.Immediate) en 00: 00: 00.6698080 para mí: public void SimpleObserveTest (int iterations, Action action, planificador IScheduler, modo de cadena) {contador = 0; var start = Stopwatch.StartNew(); Nivel observable (0, iteraciones, programador). Ejecutar (acción); OutputTestDuration ("Simple -" + modo, inicio); } –

+0

@Richard Hein: Buen hallazgo. Entonces, el Rango Observable depende mucho del planificador utilizado. Sin embargo, todavía es varias veces más lento que mi "FastObservable". Pero vale la pena saber qué perillas cambian qué. – dtb

10

Recuerde que el delegado no garantiza ninguna seguridad de los subprocesos - literalmente llama al delegado desde el hilo desde el que se llama, mientras que cuando llamas a Observable.ObserveOn para enviar notificaciones a otros hilos, Rx.NET tiene que hacer el bloqueo para asegurarte de que hace lo que crees que hace.

Por lo tanto, los delegados pueden moverse muy rápido, pero si quiere construir algo práctico usándolo, terminará construyendo sincronizaciones con la mano, lo que lo hará más lento. Habiendo dicho eso, Rx, al igual que LINQ, es una abstracción; si necesitas que sea ridículamente rápido, tienes que empezar a escribir código feo.

+0

Sí, comenzando a darse cuenta de los problemas con las suposiciones en el código ahora ;-) –

12

Actualización para Rx 2.0: Tomé el código del poste original con (casi) la última versión beta LINQPad 4.42.04 (también hay un 06, pero de todos modos): Rx Main assemblies

... y ajustado ligeramente para utilizar el nuevo v2 Rx sintaxis planificador:

 public void ReactiveExtensionsPerformanceComparisons() 
    { 
     int iterations = 1000000; 

     Action<int> a = (i) => { counter++; }; 

     DelegateSmokeTest(iterations, a); 
     ObservableRangeTest(iterations, a); 
     SubjectSubscribeTest(iterations, a, NewThreadScheduler.Default, "NewThread"); 
     SubjectSubscribeTest(iterations, a, CurrentThreadScheduler.Instance, "CurrentThread"); 
     SubjectSubscribeTest(iterations, a, ImmediateScheduler.Instance, "Immediate"); 
     SubjectSubscribeTest(iterations, a, ThreadPoolScheduler.Instance, "ThreadPool"); 
     // I *think* this is the same as the ThreadPool scheduler in my case 
     SubjectSubscribeTest(iterations, a, DefaultScheduler.Instance, "Default");     
     // doesn't work, as LinqPad has no Dispatcher attched to the Gui thread, maybe there's a workaround; the Instance property on it is obsolete 
     //SubjectSubscribeTest(iterations, a, DispatcherScheduler.Current, "ThreadPool"); 
    } 

Nota: resultados variar mucho, en casos raros Threadpool late newThread, pero en la mayoría de los casos NewThread tiene una ligera ventaja por encima de los programadores por debajo de él en la lista:

Delegate         - (1000000) - 00:00:00.0440025 
Observable.Range()      - (1000000) - 00:00:01.9251101 
Subject.Subscribe() - NewThread   - (1000000) - 00:00:00.0400023 
Subject.Subscribe() - CurrentThread  - (1000000) - 00:00:00.0530030 
Subject.Subscribe() - Immediate   - (1000000) - 00:00:00.0490028 
Subject.Subscribe() - ThreadPool   - (1000000) - 00:00:00.0490028 
Subject.Subscribe() - Default   - (1000000) - 00:00:00.0480028 

Parece que trabajaron bastante duro en el rendimiento ...

+0

¡Gracias por esta actualización! – film42

Cuestiones relacionadas