2010-11-08 10 views
11

Lo que trato de hacer es almacenar en el búfer los eventos entrantes de algunos IObservable (vienen en ráfagas) y soltarlos más, pero uno por uno, en intervalos pares. De esta manera:Una forma de enviar eventos almacenados temporalmente a intervalos regulares

-oo-ooo-oo------------------oooo-oo-o--------------> 

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o----> 

Ya que soy bastante nuevo en Rx, no estoy seguro de si ya hay un tema o un operador que hace precisamente esto. ¿Tal vez se puede hacer por composición?

actualización:

Gracias a Richard Szalay para señalar la drenaje operador, he encontrado otra example by James Miles del uso del operador de drenaje. Así es como me las arreglé para conseguir que funcione en un WPF aplicación:

.Drain(x => { 
     Process(x); 
     return Observable.Return(new Unit()) 
      .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher); 
    }).Subscribe(); 

que había un poco de diversión, debido a la omisión del parámetro planificador hace que el cierre de la aplicación en modo de depuración sin excepción a aparecer (I tenga que aprender para tratar con excepciones en Rx). El método de proceso modifica el estado de la interfaz de usuario directamente, pero supongo que es bastante simple hacer un IObservable (utilizando un objeto IS?).

actualización:

Mientras tanto, yo he estado experimentando con ISubject, la clase de abajo hace lo que quería - que deja escapar tamponada Ts en el momento oportuno:

public class StepSubject<T> : ISubject<T> 
{ 
    IObserver<T> subscriber; 
    Queue<T> queue = new Queue<T>(); 
    MutableDisposable cancel = new MutableDisposable(); 
    TimeSpan interval; 
    IScheduler scheduler; 
    bool idle = true; 

    public StepSubject(TimeSpan interval, IScheduler scheduler) 
    { 
     this.interval = interval; 
     this.scheduler = scheduler; 
    } 

    void Step() 
    { 
     T next; 
     lock (queue) 
     { 
      idle = queue.Count == 0; 
      if (!idle) 
       next = queue.Dequeue(); 
     } 

     if (!idle) 
     { 
      cancel.Disposable = scheduler.Schedule(Step, interval); 
      subscriber.OnNext(next); 
     } 
    } 

    public void OnNext(T value) 
    { 
     lock (queue) 
      queue.Enqueue(value); 

     if (idle) 
      cancel.Disposable = scheduler.Schedule(Step); 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     subscriber = observer; 
     return cancel; 
    } 
} 

Esta ingenua la implementación se elimina de OnCompleted y OnError para mayor claridad, también solo se permite la suscripción única.

+2

Su tema no es multi-hilo. Está comprobando si su cola está vacía dentro de un bloqueo, pero luego está quitando de la cerradura. –

+0

¡Gracias, lo arreglé! – majocha

Respuesta

9

En realidad es más complicado de lo que parece.

El uso de Delay no funciona porque los valores seguirán apareciendo a granel, solo un poco retrasados.

El uso de Interval con CombineLatest o Zip no funciona, ya que el primero hará que los valores de origen se omitan y el segundo almacenará los valores de intervalo.

Creo que la nueva Drain operador (added in 1.0.2787.0), combinado con Delay debe hacer el truco:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x)); 

El operador Drain funciona como SelectMany, sino que espera hasta la salida anterior se completa antes de llamar al selector con el siguiente valor. Todavía no es exactamente lo que está buscando (el primer valor en un bloque también se retrasará), pero está cerca: El uso anterior coincide con su diagrama de mármol ahora.

Editar: Al parecer, el Drain en el marco no funciona como SelectMany. Pediré algunos consejos en los foros oficiales.Mientras tanto, aquí hay una implementación de Drain que hace lo que está buscando:

Editar 09/11: Se corrigieron los errores en la implementación y el uso actualizado para que coincida con el diagrama de mármol solicitado.

public static class ObservableDrainExtensions 
{ 
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
     Func<TSource, IObservable<TOut>> selector) 
    { 
     return Observable.Defer(() => 
     { 
      BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 

      return source 
       .Zip(queue, (v, q) => v) 
       .SelectMany(v => selector(v) 
        .Do(_ => { },() => queue.OnNext(new Unit())) 
       ); 
     }); 
    } 
} 
+0

¡Gracias por la respuesta! Drenaje está cerca, pero el problema principal es que no devuelve un IObservable , por lo que no se puede utilizar en una consulta o una cadena con otros operadores. Espera que el parámetro Func devuelva un IObservable , que evita que la muestra se compile. – majocha

+0

Lamentablemente no tengo problemas con la compilación de ese ejemplo demasiado :( – majocha

+0

Gracias por la actualización! He intentado utilizar la función con .Drain (s => Observable.Return (s) .Delay (TimeSpan.FromSeconds (1), Scheduler.Dispatcher)) y funciona, además de retrasar la primera notificación. – majocha

2

simplemente para la corrección aquí es una versión alterantive (más compacto) del método de drenaje() sugerido por Richard:

public static IObservable<T2> SelectManySequential<T1, T2>(
    this IObservable<T1> source, 
    Func<T1, IObservable<T2>> selector 
) 
{ 
    return source 
     .Select(x => Observable.Defer<T2>(() => selector(x))) 
     .Concat(); 
} 

Ver el hilo Drain + SelectMany = ? en el foro Rx.

Actualización: me di cuenta de que la sobrecarga de concatenación() que utilicé fue una de mis extensiones personales Rx que forman parte (todavía no) del marco. Lo siento por este error ... Por supuesto, esto hace que mi solución sea menos elegante de lo que pensaba.

Sin embargo para la integridad he puesto aquí mi ConAct() método de extensión de sobrecarga:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source) 
{ 
    return Observable.CreateWithDisposable<T>(o => 
    { 
     var lockCookie = new Object(); 
     bool completed = false; 
     bool subscribed = false; 
     var waiting = new Queue<IObservable<T>>(); 
     var pendingSubscription = new MutableDisposable(); 

     Action<Exception> errorHandler = e => 
     { 
      o.OnError(e); 
      pendingSubscription.Dispose(); 
     }; 

     Func<IObservable<T>, IDisposable> subscribe = null; 
     subscribe = (ob) => 
     { 
      subscribed = true; 
      return ob.Subscribe(
       o.OnNext, 
       errorHandler, 
       () => 
       { 
        lock (lockCookie) 
        { 
         if (waiting.Count > 0) 
          pendingSubscription.Disposable = subscribe(waiting.Dequeue()); 
         else if (completed) 
          o.OnCompleted(); 
         else 
          subscribed = false; 
        } 
       } 
      ); 
     }; 

     return new CompositeDisposable(pendingSubscription, 
      source.Subscribe(
       n => 
       { 
        lock (lockCookie) 
        { 
         if (!subscribed) 
          pendingSubscription.Disposable = subscribe(n); 
         else 
          waiting.Enqueue(n); 
        } 

       }, 
       errorHandler 
       ,() => 
       { 
        lock (lockCookie) 
        { 
         completed = true; 
         if (!subscribed) 
          o.OnCompleted(); 
        } 
       } 
      ) 
     ); 
    }); 
} 

Y ahora superando a mí mismo con mis propias armas: El mismo método concat() se podría escribir mucho más elegante en Richard Szalay de brillante manera:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source) 
{ 
    return Observable.Defer(() => 
    { 
     BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit()); 
     return source 
      .Zip(queue, (v, q) => v) 
      .SelectMany(v => 
       v.Do(_ => { },() => queue.OnNext(new Unit())) 
      ); 
    }); 
} 

Así crédito pertenece a Richard. :-)

+0

Gracias por el enlace y el código! – majocha

+0

de hecho, un elegante d brillante! Entender ese "cortocircuito" me llevó algo de tiempo, pero la satisfacción cuando finalmente lo entendí no tiene precio :) – majocha

2

Así es como lo hice, simplemente usando una cola explícita (ReactiveCollection es sólo una versión de lujo de ObservableCollection de WPF - ReactiveCollection.ItemsAdded de OnNext para cada elemento añadido, como se puede imaginar):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null) 
{ 
    var ret = new ReactiveCollection<T>(); 
    if (WithDelay == null) { 
     FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add); 
     return ret; 
    } 

    // On a timer, dequeue items from queue if they are available 
    var queue = new Queue<T>(); 
    var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value) 
     .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => { 
      if (queue.Count > 0) { 
       ret.Add(queue.Dequeue()); 
      } 
     }); 

    // When new items come in from the observable, stuff them in the queue. 
    // Using the DeferredScheduler guarantees we'll always access the queue 
    // from the same thread. 
    FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue); 

    // This is a bit clever - keep a running count of the items actually 
    // added and compare them to the final count of items provided by the 
    // Observable. Combine the two values, and when they're equal, 
    // disconnect the timer 
    ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
     (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose()); 

    return ret; 
} 
Cuestiones relacionadas