2012-05-03 9 views
5

Tengo una gran colección de clase simple par:usar RX para desencadenar eventos en diferentes momentos?

public class Pair { public DateTime Timestamp; public double Value; } 

Están ordenados por la marca de tiempo ascendente. Quiero activar un evento con el valor (por ejemplo, acción <doble>) para cada elemento en la lista en el momento apropiado. Los tiempos están en el pasado, así que necesito normalizar las marcas de tiempo de modo que el primero en la lista sea "ahora". ¿Podemos configurar esto con las Extensiones Reactivas de modo que desencadene el siguiente evento después de la diferencia de tiempo entre dos elementos?

+0

¿Has echado un vistazo a http://reactiveproperty.codeplex.com/? – dwerner

Respuesta

6

Say pairs es su secuencia:

var obs = pairs.OrderBy(p => p.Timestamp).ToObservable(); 

Ahora es obs pares como un observable ordenada.

Observable.Zip(
    obs, 
    obs.Take(1).Concat(obs), 
    (pair1, pair2) => Observable.Timer(pair1.Timestamp - pair2.Timestamp) 
     .Select(_ => pair1.Value)) 
.Concat() 
.Subscribe(/* Do something here */); 

El zip se encarga de convertir los tiempos absolutos en desplazamientos. Se llevará a la secuencia y unirlo con ella misma, pero compensado por una, de la siguiente manera

Original 1--2--4--7--11 
Offset 1--1--2--4--7--11 
Joined 0--1--2--3--4 

Este nuevo valor se pone entonces en el Observable.Timer retrasarla la cantidad apropiada. El Concat final aplana el resultado de IObservable<IObservable<double>> en un IObservable<double>. Esto supone que tu secuencia está ordenada.

+0

Buena solución. Agregaría 'var orderedObs = pairs.OrderBy (p => p.Timestamp) .ToObservable()' para que sea obvio lo que necesita pasar y lo use en su lugar. Hice estos cambios .. – yamen

+0

Esto ayuda mucho. Lo usé para consultar datos históricos y reproducirlo como se grabó originalmente. Un simulador para probar que el nuevo sistema funciona. –

+0

Me tomó un tiempo averiguar qué estaba pasando exactamente, pero lo entiendo ahora. Rx es un mindf ***. Solución impresionante sin embargo. +1 – BFree

0

Creo que este problema es interesante, este sería mi primer intento.

static void RunPairs(IEnumerable<Pair> pairs, Action<double> pairEvent) 
{ 
    if (pairs == null || !pairs.Any() || pairEvent == null) 
    return; 

    // if we can promise the pairs are already sorted 
    // obviously we don't need this next line 
    pairs = pairs.OrderBy(p => p.Timestamp); 
    var first = pairs .First().Timestamp; 
    var wrapped = pairs.Select(p => new { Offset = (p.Timestamp - first), Pair = p }); 

    var start = DateTime.Now; 

    double interval = 250; // 1/4 second 
    Timer timer = new Timer(interval); 

    timer.AutoReset = true; 
    timer.Elapsed += (sender, elapsedArgs) => 
    { 
    var signalTime = elapsedArgs.SignalTime; 
    var elapsedTime = (signalTime - start); 

    var pairsToTrigger = wrapped.TakeWhile(wrap => elapsedTime > wrap.Offset).Select(w => w.Pair); 
    wrapped = wrapped.Skip(pairsToTrigger.Count()); 

    if (!wrapped.Any()) 
     timer.Stop(); 

    foreach (var pair in pairsToTrigger) 
     pairEvent(pair.Value);  
    }; 

    timer.Start(); 
} 
+0

Esto es innecesariamente complejo dado que Rx tiene extensiones como 'Timer',' Defer' y 'Delay'. – yamen

+0

@yamen Nunca he usado Rx ni tengo pensado hacerlo. Quería responder cómo hacerlo desde cero como un desafío porque pensé que era interesante :) lo siento si mi respuesta en este contexto es simplemente spam. – payo

+2

No hay necesidad de disculparse, espero que aprenda algo de las soluciones Rx anteriores. Su respuesta realmente sirve como un ejemplo de por qué Rx es genial :-) – yamen

2

Si por "el uso de Rx" me permiten sólo tiene que utilizar los programadores de Rx, entonces esta es una solución muy fácil:

Action<double> action = 
    x => 
     Console.WriteLine(x); 

var ts0 = pairs.Select(p => p.Timestamp).Min(); 

pairs 
    .ForEach(p => 
     Scheduler 
      .ThreadPool 
      .Schedule(
       p.Timestamp.Subtract(ts0), 
       () => action(p.Value))); 

Este utiliza el System.Interactive extensión ForEach, pero sólo podía utilizar un bucle regular foreach para cargar el programador.

He probado el código con los siguientes datos ficticios:

var pairs = new [] 
{ 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 30), Value = 1.1, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 45), Value = 1.2, }, 
    new Pair { Timestamp = new DateTime(2011, 1, 1, 7, 12, 40), Value = 1.3, }, 
}; 

espero que esto ayude.

+0

¿El programador tiene su propia cola? ¿O este código masticará todo el grupo de hilos? Me preocupa la escalabilidad de esta solución. – Brannon

+0

@Brannon: si recuerdo correctamente, los planificadores utilizan una clasificación de montón internamente para poner en cola las acciones.Además, un planificador solo ejecutará una acción a la vez y reutilizará el hilo actual si otra acción está lista para comenzar inmediatamente. Entonces solo usan un hilo a la vez. – Enigmativity

Cuestiones relacionadas