2012-10-08 23 views
7

Dada una fuente observable, generada mediante el sondeo de los cambios (a) estado de un dispositivo de bajo nivel ...Rx Marco: ejecutar una acción en tiempo de espera sin interrumpir la secuencia original, observable

// observable source metacode: 
IObservable<DeviceState> source = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
    .Select(tick => new DeviceState(_device.ReadValue())) 
    .DistinctUntilChanged(); 

.. . y un consumidor que actualiza la interfaz de usuario ...

// UI metacode: 
service.GetObservableDeviceStates() 
    .Subscribe(state => viewModel.CurrentState = state.ToString()); 

... necesito para ejecutar una acción personalizada después de x segundos de "inactividad" de la fuente, sin interrumpir la suscripción a la fuente. Algo como esto:

// UI metacode: 
service.GetObservableDeviceStates() 
    .DoOnTimeout(TimeSpan.FromSeconds(x),() => viewModel.CurrentState = "Idle") 
    .Subscribe(state => viewModel.CurrentState = state.ToString()); 

¿Cuáles son las mejores prácticas? Las posibles soluciones que vienen a la mente son (soy un novato Rx):

  1. Buffer (aunque no es tan legible)
  2. Jugando this Timeout overload;
  3. Volviendo algo especial "del lado del servicio" cuando nada cambia (en lugar de utilizar DistinctUntilChanged) y tratar con ella en el código de interfaz de usuario:

    service.GetObservableDeviceStates() .Subscribe (estado => viewModel.CurrentState = state.Special? "Idle": state.ToString());

EDIT: como se informó in the answer, la solución es:

 service.GetObservableDeviceStates() 
      .Do(onNext) 
      .Throttle(TimeSpan.FromSeconds(x)) 
      .Subscribe(onTimeout); 

Edit2 (advertencia)

Si los componentes actualizaciones de interfaz de usuario onNext y onTimeout, para evitar CrossThreadExceptions dos ObserveOn (uiSynchronizationContext) son necesarios, ya que Throttle trabaja en otro hilo.

 service.GetObservableDeviceStates() 
      .ObserveOn(uiSynchronizationContext) 
      .Do(onNext) 
      .Throttle(TimeSpan.FromSeconds(x)) 
      .ObserveOn(uiSynchronizationContext) 
      .Subscribe(onTimeout); 
+0

puede especificar el programador de subprocesos de interfaz de usuario como parámetro al acelerador con el fin de ejecutar los temporizadores de estrangulación allí, en caso de que quiera evitar el salto ObserveOn adicional. –

+0

@BartDeSmet Ya probé Throttle (x, Scheduler.CurrentThread), pero estaba perdiendo la capacidad de respuesta de la interfaz de usuario ... En mi caso, ObserveOn funcionó mejor – Notoriousxl

Respuesta

7

tiempo de espera es más o menos significaba para observaciones que representan operaciones asíncronas individuales - por ejemplo, para devolver un valor predeterminado o si dicho OnError observables no le ha notificado en una cierta cantidad de tiempo.

El operador que está buscando es Throttle, aunque al principio no lo parezca. Throttle(p) le proporciona una secuencia que produce un valor cuando la secuencia fuente no ha producido un valor para el período p.

Paralelo a su código existente, puede usar source.Throttle(period).Do(...side effect).

+0

¡Funciona como un amuleto, gracias! Siempre he usado Throttle para descartar elementos (y para consumir los restantes en la suscripción final), nunca para llamar a una devolución de llamada "onPause" :) – Notoriousxl

5

Personalmente evitaría el método Do para esto. Hace que el código en este ejemplo sea bastante fácil, pero encuentro que una vez que el uso de 'Hacer' se cuela en la base de código, pronto tiene spaghetti.

También podría considerar el uso de combinaciones de Amb, Timer, TakeUntil, Throttle, etc. para obtener el resultado que busca y seguir manteniendo el Monad *. O, en términos simples, supongo que idealmente quieres tener una secuencia de los valores de estado que llegan y no requerir la necesidad de poner un temporizador en tu código (es decir, descargarlo al servicio).

public IObservable<DeviceStatus> GetObservableDeviceStates(TimeSpan silencePeriod) 
{ 
    return Observable.Create<DeviceStatus>(
    o=> 
    { 
     var idle = Observable.Timer(silencePeriod).Select(_=>new DeviceStatus("Idle")); 

     var polledStatus = Observable.Interval(TimeSpan.FromSeconds(0.5)) 
         .Select(tick => new DeviceStatus(_device.ReadValue())) 
         .DistinctUntilChanged() 
         .Publish(); 

     var subscription = (from status in polledStatus 
          from cont in Observable.Return(status).Concat(idle.TakeUntil(polledStatus)) 
          select cont) 
        .Subscribe(o); 

     return new CompositeDisposable(subscription, polledStatus.Connect()); 
    }); 
} 

Este código tiene ahora el Servicio de devolver un valor de estado de reposo una vez se ha producido el período especificado de cambio de silencio.

Esto significa que el código de interfaz de usuario meta se mantiene simple y la lógica relacionada con devicestatus permanezca donde pertenece

// UI metacode: 
service.GetObservableDeviceStates(TimeSpan.FromSeconds(2)) 
    .Subscribe(state => viewModel.CurrentState = state.ToString()); 
+1

+1 porque no se puede hacer suficiente hincapié en que 'Do' es de cierto tipo del mal. Sin embargo, creo que 'Timestamp' |>' CombineLatest' podría ser un poco más eficaz en este caso. – Asti

+0

¿Siente? Trabajamos en una ciencia aplicada. Solo proporciona una prueba. Declarar recomendaciones de desempeño basadas en los sentimientos es un problema en nuestra industria que debe detenerse. –

+0

¡Nigromancia sagrada, Batman! Punto tomado Lee. Seré una mejor persona para eso. :) – Asti

Cuestiones relacionadas