2010-07-09 15 views
25

Quiero estrangular eficazmente una secuencia de eventos, de modo que se llame a mi delegado cuando se recibe el primer evento pero luego no durante 1 segundo si se reciben eventos posteriores. Después de la expiración de ese tiempo de espera (1 segundo), si se recibió un evento posterior, quiero que se llame a mi delegado.¿Cómo estrangular la secuencia de eventos con RX?

¿Hay una manera simple de hacer esto usando Reactive Extensions?

Código de ejemplo:

static void Main(string[] args) 
{ 
    Console.WriteLine("Running..."); 

    var generator = Observable 
     .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
     .Timestamp(); 

    var builder = new StringBuilder(); 

    generator 
     .Sample(TimeSpan.FromSeconds(1)) 
     .Finally(() => Console.WriteLine(builder.ToString())) 
     .Subscribe(feed => 
        builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}", 
                feed.Value, 
                feed.Timestamp.ToString("mm:ss.fff"), 
                DateTime.Now.ToString("mm:ss.fff")))); 

    Console.ReadKey(); 
} 

Salida de corriente:

Running... 
Observed 064, generated at 41:43.602, observed at 41:43.602 
Observed 100, generated at 41:44.165, observed at 41:44.602 

Pero quiero observar (marcas de tiempo, obviamente, va a cambiar)

Running... 
Observed 001, generated at 41:43.602, observed at 41:43.602 
.... 
Observed 100, generated at 41:44.165, observed at 41:44.602 
+14

eso es sólo una declaración fresca lambda 'x => x <= 100';) – Oliver

Respuesta

11

Aquí es lo que tengo con la ayuda de la RX Foro:

La idea es emitir una serie de "entradas" para la secuencia original al fuego. Estas "entradas" se retrasan por el tiempo de espera, excluyendo la primera, que se agrega inmediatamente a la secuencia del ticket. Cuando llega un evento y hay un ticket esperando, el evento se dispara de inmediato, de lo contrario, espera hasta el ticket y luego se dispara. Cuando se dispara, se emite el próximo ticket, y así sucesivamente ...

Para combinar los tickets y los eventos originales, necesitamos un combinador. Desafortunadamente, el "estándar" .CombineLatest no se puede usar aquí porque se activará en tickets y eventos que se utilizaron previamente. Así que tuve que crear mi propio combinador, que básicamente es un filtro filtrado .CombineLatest, que solo se activa cuando ambos elementos de la combinación son "nuevos", nunca se devolvieron antes. Lo llamo .CombineVeryLatest aka .BrokenZip;)

Usando .CombineVeryLatest, la idea anterior se puede implementar como tal:

public static IObservable<T> SampleResponsive<T>(
     this IObservable<T> source, TimeSpan delay) 
    { 
     return source.Publish(src => 
     { 
      var fire = new Subject<T>(); 

      var whenCanFire = fire 
       .Select(u => new Unit()) 
       .Delay(delay) 
       .StartWith(new Unit()); 

      var subscription = src 
       .CombineVeryLatest(whenCanFire, (x, flag) => x) 
       .Subscribe(fire); 

      return fire.Finally(subscription.Dispose); 
     }); 
    } 

    public static IObservable<TResult> CombineVeryLatest 
     <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, 
     IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) 
    { 
     var ls = leftSource.Select(x => new Used<TLeft>(x)); 
     var rs = rightSource.Select(x => new Used<TRight>(x)); 
     var cmb = ls.CombineLatest(rs, (x, y) => new { x, y }); 
     var fltCmb = cmb 
      .Where(a => !(a.x.IsUsed || a.y.IsUsed)) 
      .Do(a => { a.x.IsUsed = true; a.y.IsUsed = true; }); 
     return fltCmb.Select(a => selector(a.x.Value, a.y.Value)); 
    } 

    private class Used<T> 
    { 
     internal T Value { get; private set; } 
     internal bool IsUsed { get; set; } 

     internal Used(T value) 
     { 
      Value = value; 
     } 
    } 

Editar: aquí hay otra variante más compacta de CombineVeryLatest propuesto por Andreas Köpf en el foro :

public static IObservable<TResult> CombineVeryLatest 
    <TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) 
{ 
    return Observable.Defer(() => 
    { 
     int l = -1, r = -1; 
     return Observable.CombineLatest(
      leftSource.Select(Tuple.Create<TLeft, int>), 
      rightSource.Select(Tuple.Create<TRight, int>), 
       (x, y) => new { x, y }) 
      .Where(t => t.x.Item2 != l && t.y.Item2 != r) 
      .Do(t => { l = t.x.Item2; r = t.y.Item2; }) 
      .Select(t => selector(t.x.Item1, t.y.Item1)); 
    }); 
} 
0

¿Has probado el método Throttle extensión?

A partir de los documentos:

ignora los valores de una secuencia observables que son seguidos por otro valor antes dueTime

No bastante claro para mí es si eso va a hacer lo que quiere o no - en eso quiere ignorar los siguientes valores en lugar del primer valor ... pero lo haría espero que sea lo que quiere. Pruébalo :)

EDITAR: Hmmm ... no, no creo que Throttlees lo correcto, después de todo. Creo que veo lo que quieres hacer, pero no puedo ver nada en el marco para hacerlo. Aunque bien podría haberme perdido algo. ¿Has preguntado en el foro de Rx? Es muy posible que si no es ahora, estarían encantados de añadirlo :)

I sospechoso que podría hacerlo con astucia con SkipUntil y SelectMany de alguna manera ... pero creo que debería estar en su propio método.

+0

Gracias Jon. Le di una oportunidad, pero no es exactamente lo que quiero. En el ejemplo, al utilizar Throttle se ignoran todos los eventos excepto el último. Necesito reaccionar ante el primer evento (para proporcionar un sistema receptivo), pero luego retrasarlo a una tasa de muestreo de 1 segundo para los eventos posteriores. – Alex

+0

@Alex: Sí, eso es lo que encontré también. (Ver mi edición.) –

0

Lo que está buscando es el CombineLatest.

public static IObservable<TResult> CombineLatest<TLeft, TRight, TResult>(
    IObservable<TLeft> leftSource, 
    IObservable<TRight> rightSource, 
    Func<TLeft, TRight, TResult> selector 
) 

que fusiona 2 obeservables, y devolver todos los valores, cuando el selector (tiempo) tiene un valor.

edición: John tiene razón, eso no es quizá la solución preferida

+0

No veo cómo eso es lo que él busca: ¿qué ves como los dos observables aquí? –

+0

Uno es los Eventos que genera, el selector es un Observable. Intervalo (TimeSpan.FromSeconds (1)) – cRichter

+0

¿No generará un evento cada vez * cualquiera * de ellos produce un valor? –

9

bien,

tienes 3 escenarios aquí:

1) que me gustaría obtener un valor de la secuencia de eventos cada segundo. significa que si produce más eventos por segundo, obtendrá un buffer siempre mayor.

observableStream.Throttle(timeSpan) 

2) Me gustaría obtener el último evento, que se produjo antes de la segunda sucede medios: otros eventos quedar afuera.

observableStream.Sample(TimeSpan.FromSeconds(1)) 

3) le gustaría obtener todos los eventos, que ocurrieron en el último segundo. y que cada segundo

observableStream.BufferWithTime(timeSpan) 

4) que desea seleccionar lo que ocurre en entre el segundo con todos los valores, hasta que el segundo ha pasado, y el resultado se devuelve

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent) 
+1

Dang, el escenario 2 es exactamente lo que estoy buscando, y no he podido encontrar el método tampoco :( – deadlydog

+1

En caso de que alguien lo necesite: 'stream. Sample (TimeSpan.FromSeconds (1)) ' – AlexFoxGill

2

Ok, aquí hay uno solución. No me gusta particularmente, pero ... bueno.

Sombrero consejos a Jon por señalarme en SkipWhile, y a cRichter por el BufferWithTime. Gracias chicos.

static void Main(string[] args) 
{ 
    Console.WriteLine("Running..."); 

    var generator = Observable 
     .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
     .Timestamp(); 

    var bufferedAtOneSec = generator.BufferWithTime(TimeSpan.FromSeconds(1)); 

    var action = new Action<Timestamped<int>>(
     feed => Console.WriteLine("Observed {0:000}, generated at {1}, observed at {2}", 
            feed.Value, 
            feed.Timestamp.ToString("mm:ss.fff"), 
            DateTime.Now.ToString("mm:ss.fff"))); 

    var reactImmediately = true; 
    bufferedAtOneSec.Subscribe(list => 
            { 
             if (list.Count == 0) 
             { 
              reactImmediately = true; 
             } 
             else 
             { 
              action(list.Last()); 
             } 
            }); 
    generator 
     .SkipWhile(item => reactImmediately == false) 
     .Subscribe(feed => 
         { 
          if(reactImmediately) 
          { 
           reactImmediately = false; 
           action(feed); 
          } 
         }); 

    Console.ReadKey(); 
} 
5

Este es el lo que he publicado como una respuesta a esta pregunta en el Rx forum:

ACTUALIZACIÓN: Aquí está una nueva versión que ya no retrasar el reenvío de eventos cuando se producen eventos con un tiempo de diferencia de más de un segundo:

public static IObservable<T> ThrottleResponsive3<T>(this IObservable<T> source, TimeSpan minInterval) 
{ 
    return Observable.CreateWithDisposable<T>(o => 
    { 
     object gate = new Object(); 
     Notification<T> last = null, lastNonTerminal = null; 
     DateTime referenceTime = DateTime.UtcNow - minInterval; 
     var delayedReplay = new MutableDisposable(); 
     return new CompositeDisposable(source.Materialize().Subscribe(x => 
     { 
      lock (gate) 
      { 
       var elapsed = DateTime.UtcNow - referenceTime; 
       if (elapsed >= minInterval && delayedReplay.Disposable == null) 
       { 
        referenceTime = DateTime.UtcNow; 
        x.Accept(o); 
       } 
       else 
       { 
        if (x.Kind == NotificationKind.OnNext) 
         lastNonTerminal = x; 
        last = x; 
        if (delayedReplay.Disposable == null) 
        { 
         delayedReplay.Disposable = Scheduler.ThreadPool.Schedule(() => 
         { 
          lock (gate) 
          { 
           referenceTime = DateTime.UtcNow; 
           if (lastNonTerminal != null && lastNonTerminal != last) 
            lastNonTerminal.Accept(o); 
           last.Accept(o); 
           last = lastNonTerminal = null; 
           delayedReplay.Disposable = null; 
          } 
         }, minInterval - elapsed); 
        } 
       } 
      } 
     }), delayedReplay); 
    }); 
} 

Esta fue mi anterior intento:

var source = Observable.GenerateWithTime(1, 
    x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1) 
    .Timestamp(); 

source.Publish(o => 
    o.Take(1).Merge(o.Skip(1).Sample(TimeSpan.FromSeconds(1))) 
).Run(x => Console.WriteLine(x)); 
+4

Acabo de encontrar un enlace a esta respuesta en los créditos de la aplicación Microsoft Todo UWP :) –

6

he tenido problemas con este mismo problema ayer por la noche, y creo que he encontrado una (o al menos más corto) solución más elegante:

var delay = Observable.Empty<T>().Delay(TimeSpan.FromSeconds(1)); 
var throttledSource = source.Take(1).Concat(delay).Repeat(); 
0

Inspirado en la respuesta de Bluelings Aquí proporciono una versión que compila con Reactive Extensions 2.2.5.

Esta versión particular cuenta el número de muestras y también proporciona el último valor muestreado.Para ello la siguiente clase se utiliza:

class Sample<T> { 

    public Sample(T lastValue, Int32 count) { 
    LastValue = lastValue; 
    Count = count; 
    } 

    public T LastValue { get; private set; } 

    public Int32 Count { get; private set; } 

} 

Aquí es el operador:

public static IObservable<Sample<T>> SampleResponsive<T>(this IObservable<T> source, TimeSpan interval, IScheduler scheduler = null) { 
    if (source == null) 
    throw new ArgumentNullException(nameof(source)); 
    return Observable.Create<Sample<T>>(
    observer => { 
     var gate = new Object(); 
     var lastSampleValue = default(T); 
     var lastSampleTime = default(DateTime); 
     var sampleCount = 0; 
     var scheduledTask = new SerialDisposable(); 
     return new CompositeDisposable(
     source.Subscribe(
      value => { 
      lock (gate) { 
       var now = DateTime.UtcNow; 
       var elapsed = now - lastSampleTime; 
       if (elapsed >= interval) { 
       observer.OnNext(new Sample<T>(value, 1)); 
       lastSampleValue = value; 
       lastSampleTime = now; 
       sampleCount = 0; 
       } 
       else { 
       if (scheduledTask.Disposable == null) { 
        scheduledTask.Disposable = (scheduler ?? Scheduler.Default).Schedule(
        interval - elapsed, 
        () => { 
         lock (gate) { 
         if (sampleCount > 0) { 
          lastSampleTime = DateTime.UtcNow; 
          observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
          sampleCount = 0; 
         } 
         scheduledTask.Disposable = null; 
         } 
        } 
       ); 
       } 
       lastSampleValue = value; 
       sampleCount += 1; 
       } 
      } 
      }, 
      error => { 
      if (sampleCount > 0) 
       observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
      observer.OnError(error); 
      }, 
     () => { 
      if (sampleCount > 0) 
       observer.OnNext(new Sample<T>(lastSampleValue, sampleCount)); 
      observer.OnCompleted(); 
      } 
     ), 
     scheduledTask 
    ); 
    } 
); 
} 
Cuestiones relacionadas