2012-07-12 7 views
11

Un caso de uso que he encontrado, y sospecho que no puedo ser el único, es para un método como:¿Hay un método Rx para repetir el valor anterior periódicamente cuando no hay valores entrantes?

IObservable<T> Observable.RepeatLastValueDuringSilence(this IObservable<T> inner, TimeSpan maxQuietPeriod); 

que devolver todos los artículos futuros de la observables interno, sino también, si el observable interno no llama a OnNext durante un cierto período de tiempo (maxQuietPeriod), simplemente repite el último valor (hasta, por supuesto, llamadas internas OnCompleted o OnError).

Una justificación sería que un servicio muestre periódicamente una actualización de estado periódica. Por ejemplo:

var myStatus = Observable.FromEvent(
    h=>this.StatusUpdate+=h, 
    h=>this.StatusUpdate-=h); 

var messageBusStatusPinger = myStatus 
    .RepeatLastValueDuringSilence(TimeSpan.FromSeconds(1)) 
    .Subscribe(update => _messageBus.Send(update)); 

¿Algo como esto existe? ¿O estoy sobreestimando su utilidad?

Gracias, Alex

PD: Me disculpo por cualquier terminología/sintaxis incorrecta, ya que sólo estoy sólo para explorar Rx por primera vez.

+0

¿A qué frecuencia debería seguir repitiendo? – Asti

+0

@Asti: la intención era que esto debería ser especificado por maxQuietPeriod. Si la secuencia interna no ha producido un valor para maxQuietPeriod, se debe generar un valor repetido. – AlexC

Respuesta

5

solución similar a la de Mateo, pero aquí el temporizador comienza después de que se recibe cada elemento en la fuente, que creo que es más correcto (sin embargo, es poco probable que importen las diferencias):

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod) 
{  
    return inner.Select(x => 
     Observable.Interval(maxQuietPeriod) 
        .Select(_ => x) 
        .StartWith(x) 
    ).Switch(); 
} 

Y la prueba:

var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "1") 
         .Concat(Observable.Interval(TimeSpan.FromSeconds(1)).Take(5).Select(_ => "2")) 
         .Concat(Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(5).Select(_ => "3")); 

source.RepeatLastValueDuringSilence(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine); 

Debería ver 1 impresos 10 veces (5 de la fuente, 5 repitieron durante el silencio), entonces un montón de 2 como se obtiene el de origen y 4 más de silencio entre cada uno, seguido de infinito 3.

+0

Esto se parece a lo que estaba buscando, gracias. – AlexC

-1

No existe un método en las bibliotecas Rx, pero también tuve la necesidad de tal método. En mi caso de uso, necesitaba mostrar valores incluso si la fuente no arroja ningún valor. Si no desea emitir ningún valor hasta que aparezca el primer valor de origen, puede eliminar el parámetro defaultValue y la llamada al createTimer() antes de la llamada de suscripción.

El programador es necesario para ejecutar el temporizador. Una sobrecarga obvia sería aquella que no toma un programador y selecciona un programador predeterminado (utilicé el programador de ThreadPool).

Imports System.Reactive 
Imports System.Reactive.Concurrency 
Imports System.Reactive.Disposables 
Imports System.Reactive.Linq 

<Extension()> 
Public Function AtLeastEvery(Of T)(source As IObservable(Of T), 
            timeout As TimeSpan, 
            defaultValue As T, 
            scheduler As IScheduler 
           ) As IObservable(Of T) 
    If source Is Nothing Then Throw New ArgumentNullException("source") 
    If scheduler Is Nothing Then Throw New ArgumentNullException("scheduler") 
    Return Observable.Create(
     Function(observer As IObserver(Of T)) 
      Dim id As ULong 
      Dim gate As New Object() 
      Dim timer As New SerialDisposable() 
      Dim lastValue As T = defaultValue 

      Dim createTimer As Action = 
       Sub() 
        Dim startId As ULong = id 
        timer.Disposable = scheduler.Schedule(timeout, 
              Sub(self As Action(Of TimeSpan)) 
               Dim noChange As Boolean 
               SyncLock gate 
                noChange = (id = startId) 
                If noChange Then 
                 observer.OnNext(lastValue) 
                End If 
               End SyncLock 
               'only restart if no change, otherwise 
               'the change restarted the timeout 
               If noChange Then self(timeout) 
              End Sub) 
       End Sub 
      'start the first timeout 
      createTimer() 
      'subscribe to the source observable 
      Dim subscription = source.Subscribe(
       Sub(v) 
        SyncLock gate 
         id += 1UL 
         lastValue = v 
        End SyncLock 
        observer.OnNext(v) 
        createTimer() 'reset the timeout 
       End Sub, 
       Sub(ex) 
        SyncLock gate 
         id += 1UL 
        End SyncLock 
        observer.OnError(ex) 
        'do not reset the timeout, because the sequence has ended 
       End Sub, 
       Sub() 
        SyncLock gate 
         id += 1UL 
        End SyncLock 
        observer.OnCompleted() 
        'do not reset the timeout, because the sequence has ended 
       End Sub) 

      Return New CompositeDisposable(timer, subscription) 
     End Function) 
End Function 
1

creo que esto hace lo que quiere, si su observables no está caliente que necesita para Publish y Refcount que:

public static IObservable<T> RepeatLastValueDuringSilence<T>(this IObservable<T> inner, TimeSpan maxQuietPeriod) 
{ 
    var throttled = inner.Throttle(maxQuietPeriod); 
    var repeating = throttled.SelectMany(i => 
     Observable 
      .Interval(maxQuietPeriod) 
      .Select(_ => i) 
      .TakeUntil(inner)); 
    return Observable.Merge(inner, throttled, repeating); 
} 
+0

Debe tenerse en cuenta que para que esto funcione, el observable debe estar caliente, quizás agregue esto al frente: 'var published = inner.Publish().RefCount(); ' – yamen

+0

Saludos, lo he editado en –

5

Esta consulta bastante simple hace el trabajo:

var query = 
    source 
     .Select(s => 
      Observable 
       .Interval(TimeSpan.FromSeconds(1.0)) 
       .StartWith(s) 
       .Select(x => s)) 
     .Switch(); 

Nunca subestime el poder de .Switch().

+0

Agradable. Quité mi 'TakeUntil' y cambié' Concat' a 'Switch' también. – yamen

Cuestiones relacionadas