2010-08-16 21 views
7

Tengo un IObservable que produce valores a intervalos aleatorios y quiero acelerar esta secuencia. Una cosa que descubrí es que la definición de "estrangulamiento" del operador Throttle no es la misma que la mía.Extensiones reactivas: Acelerador/Muestra con intervalo variable

Throttle solo produce valores después de que transcurra el intervalo especificado con silencio (produce el último valor visto). Pensé que la limitación significaría producir valores en el intervalo especificado (a menos que haya silencio, por supuesto).

Digamos, esperaba Observable.Interval(100).Select((_,i) => i).Throttle(200) para producir (módulo cualquier problema de rendimiento/temporización) los números pares, ya que estoy acelerando a "velocidad media". Sin embargo, esa secuencia no produce ningún valor, porque nunca hay un período de silencio de longitud 200.

Por lo tanto, descubrí que Sample realmente tiene el comportamiento de "aceleración" que deseo. Observable.Interval(100).Select((_,i) => i).Sample(200) produce (de nuevo, módulo cualquier problema de rendimiento/temporización) la secuencia de números pares.

Sin embargo, tengo otro problema: el intervalo varía, dependiendo del último valor "muestreado". Lo que quiero es escribir un operador que tiene este aspecto:

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector); 

El parámetro intervalSelector produce el intervalo para la siguiente muestra, y la primera muestra ... o bien se toma al primer valor o de un parámetro adicional No me importa

Intenté escribir esto pero terminé con una gran construcción intrincada que no funcionó del todo bien. Mi pregunta es, ¿puedo construir esto usando los operadores existentes (también conocidos como "one-liner")?

Respuesta

5

Muchas horas después, y con un poco de sueño, lo conseguí.

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector) 
{ 
    return source.TimeInterval() 
       .Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) => 
       { 
        if(v.Interval >= acc.Item1) 
        { 
         return Tuple.Create(intervalSelector(v.Value), true, v.Value); 
        } 
        return Tuple.Create(acc.Item1 - v.Interval, false, v.Value); 
       }) 
       .Where(t => t.Item2) 
       .Select(x => x.Item3); 
} 

Esto funciona como yo quiero: cada vez que se produce un valor de x, se detiene la producción de valores hasta que pase intervalSelector(x) tiempo.

0

¿No es lo que estás buscando for Observable.BufferWithTime?

+0

BufferWithTime sufre del mismo defecto que los demás: el intervalo de tiempo es constante. Necesito calcular cuánto tiempo esperar para tomar la siguiente muestra del último valor muestreado. Veré si puedo dibujar un diagrama de mármol para esto ... –

Cuestiones relacionadas