2012-08-11 8 views
10

IMPORTANTE: para obtener una descripción de los resultados y algunos detalles más, por favor, eche un vistazo también a mi respuestaoperador de Rx a secuencias distintas

que necesito para agrupar y filtrar una secuencia de objetos/eventos que por lo general son replicado, guardándolos en un buffer con un intervalo TimeSpan. Trato de explicarlo mejor con tipo de diagramas de mármol:

X-X-X-X-X-Y-Y-Y-Z-Z-Z-Z-X-X-Y-Z-Z 

produciría

X---Y---Z---X---Y---Z 

donde X, Y y Z son diferentes tipos de eventos, y '---' significa el intervalo. Además, también me gustaría a distinta por una propiedad clave que está disponible en todos los tipos, ya que tienen una clase base común:

X, Y, Z : A 

y A contiene una clave de propiedad. Utilizando la notación Xa significado X.Key = a, una muestra final sería:

X.a-X.b-X.a-Y.b-Y.c-Z.a-Z.a-Z.c-Z.b-Z.c 

produciría

X.a-X.b---Y.b-Y.c-Z.a-Z.c-Z.b 

¿Puede alguien ayudarme a armar los operadores LINQ requeridos (probablemente DistinctUntilChanged y tampón) a lograr este comportamiento? Gracias

ACTUALIZACIÓN 18/08/12:

conforme a lo solicitado, que tratan de dar una mejor explicación. Tenemos dispositivos que recopilan y envían eventos a un servicio web. Estos dispositivos tienen una lógica antigua (y no podemos cambiarla debido a la compatibilidad con versiones anteriores) y continuamente envían un evento hasta que reciben un acuse de recibo; después del reconocimiento, envían el siguiente evento en su cola, y así sucesivamente. Los eventos contienen la dirección de red de la unidad y algunas otras propiedades que distinguen los eventos en la cola para cada dispositivo. Un evento es el siguiente:

class Event 
{ 
    public string NetworkAddress { get; } 

    public string EventCode { get; } 

    public string AdditionalAttribute { get; } 
} 

El objetivo es el de procesamiento de cada 5 segundos los eventos distinguidos recibieron de todos los dispositivos, almacenamiento de la información en la base de datos (por eso no queremos hacerlo en lotes) y enviando la respuesta al dispositivo. Vamos a hacer un ejemplo con sólo dos dispositivos y algunos eventos:

Device 'a': 
Event 1 (a1): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'x' 
Event 2 (a2): NetworkAddress = '1', EventCode = A, AdditionalAttribute = 'y' 
Event 3 (a3): NetworkAddress = '1', EventCode = B, AdditionalAttribute = 'x' 

Device 'b': 
Event 1 (b1): NetworkAddress = '2', EventCode = A, AdditionalAttribute = 'y' 
Event 2 (b2): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'x' 
Event 3 (b3): NetworkAddress = '2', EventCode = B, AdditionalAttribute = 'y' 
Event 4 (b4): NetworkAddress = '2', EventCode = C, AdditionalAttribute = 'x' 

Pn are the operations done by our server, explained later 

posible diagrama de mármol (flujos de entrada + flujo de salida):

Device 'a'   : -[a1]-[a1]-[a1]----------------[a2]-[a2]-[a2]-[a3]-[a3]-[a3]-... 
Device 'b'   : ------[b1]-[b1]-[b2]-[b2]-[b2]------[b3]-[b3]-[b4]-[b4]-[b4]-... 

Time    : ------------[1s]-----------[2s]------------[3s]------------[4s]- 
DB/acks (rx output) : ------------[P1]-----------[P2]------------[P3]------------[P4]- 

P1: Server stores and acknowledges [a1] and [b1] 
P2: "  "  " "   [b2] 
P3: "  "  " "   [a2] and [b3] 
P4: "  "  " "   [a3] and [b4] 

Al final creo que es probablemente una simple combinación de operadores básicos, pero soy nuevo en Rx y estoy un poco confundido ya que parece que hay muchos operadores (o combinaciones de operadores) para obtener el mismo flujo de salida.

Actualización 19/08/12:

Por favor, tenga en cuenta que este código se ejecuta en el servidor y debe funcionar durante días sin pérdidas de memoria ... no estoy seguro sobre el comportamiento de los sujetos.Por el momento, para cada evento llamo a una operación de inserción en un servicio, que llama a OnNext de un Asunto sobre el cual debo construir la consulta (si no estoy equivocado sobre el uso de temas).

actualización 20.08.12:

aplicación actual, incluyendo la prueba de validación; esto es lo que he intentado y parece que el mismo sugerida por @yamen

public interface IEventService 
{ 
    // Persists the events 
    void Add(IEnumerable<Event> events); 
} 

public class Event 
{ 
    public string Description { get; set; } 
} 

/// <summary> 
/// Implements the logic to handle events. 
/// </summary> 
public class EventManager : IDisposable 
{ 
    private static readonly TimeSpan EventHandlingPeriod = TimeSpan.FromSeconds(5); 

    private readonly Subject<EventMessage> subject = new Subject<EventMessage>(); 

    private readonly IDisposable subscription; 

    private readonly object locker = new object(); 

    private readonly IEventService eventService; 

    /// <summary> 
    /// Initializes a new instance of the <see cref="EventManager"/> class. 
    /// </summary> 
    /// <param name="scheduler">The scheduler.</param> 
    public EventManager(IEventService eventService, IScheduler scheduler) 
    { 
     this.eventService = eventService; 
     this.subscription = this.CreateQuery(scheduler); 
    } 

    /// <summary> 
    /// Pushes the event. 
    /// </summary> 
    /// <param name="eventMessage">The event message.</param> 
    public void PushEvent(EventMessage eventMessage) 
    { 
     Contract.Requires(eventMessage != null); 
     this.subject.OnNext(eventMessage); 
    } 

    /// <summary> 
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. 
    /// </summary> 
    /// <filterpriority>2</filterpriority> 
    public void Dispose() 
    { 
     this.Dispose(true); 
    } 

    private void Dispose(bool disposing) 
    { 
     if (disposing) 
     { 
      // Dispose unmanaged resources 
     } 

     this.subject.Dispose(); 
     this.subscription.Dispose(); 
    } 

    private IDisposable CreateQuery(IScheduler scheduler) 
    { 
     var buffered = this.subject 
      .DistinctUntilChanged(new EventComparer()) 
      .Buffer(EventHandlingPeriod, scheduler); 

     var query = buffered 
      .Subscribe(this.HandleEvents); 
     return query; 
    } 

    private void HandleEvents(IList<EventMessage> eventMessages) 
    { 
     Contract.Requires(eventMessages != null); 
     var events = eventMessages.Select(this.SelectEvent); 
     this.eventService.Add(events); 
    } 

    private Event SelectEvent(EventMessage message) 
    { 
     return new Event { Description = "evaluated description" }; 
    } 

    private class EventComparer : IEqualityComparer<EventMessage> 
    { 
     public bool Equals(EventMessage x, EventMessage y) 
     { 
      return x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; 
     } 

     public int GetHashCode(EventMessage obj) 
     { 
      var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); 
      return s.GetHashCode(); 
     } 
    } 
} 

public class EventMessage 
{ 
    public string NetworkAddress { get; set; } 

    public byte EventCode { get; set; } 

    public byte Attribute { get; set; } 

    // Other properties 
} 

Y la prueba:

public void PushEventTest() 
    { 
     const string Address1 = "A:2.1.1"; 
     const string Address2 = "A:2.1.2"; 

     var eventServiceMock = new Mock<IEventService>(); 

     var scheduler = new TestScheduler(); 
     var target = new EventManager(eventServiceMock.Object, scheduler); 
     var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; 
     var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     scheduler.Schedule(() => target.PushEvent(eventMessageA1)); 
     scheduler.Schedule(TimeSpan.FromSeconds(1),() => target.PushEvent(eventMessageB1)); 
     scheduler.Schedule(TimeSpan.FromSeconds(2),() => target.PushEvent(eventMessageA1)); 

     scheduler.AdvanceTo(TimeSpan.FromSeconds(6).Ticks); 

     eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 2)), Times.Once()); 

     scheduler.Schedule(TimeSpan.FromSeconds(3),() => target.PushEvent(eventMessageB1)); 

     scheduler.AdvanceTo(TimeSpan.FromSeconds(11).Ticks); 

     eventServiceMock.Verify(s => s.Add(It.Is<List<Event>>(list => list.Count == 1)), Times.Once()); 
    } 

Además, hago notar de nuevo que es muy importante que el software podía correr durante días sin problemas, manejando miles de mensajes. Para dejarlo en claro: la prueba no pasa con la implementación actual.

+1

La secuencia final en su pregunta 'X.a-X.b --- Y.b-Y.c-Z.a-Z.c-Z.b' sólo muestra un' '--- intervalo. ¿Es correcto o debería ser el intervalo entre cada valor? – Enigmativity

+0

Sería útil si proporcionó los diagramas de mármol como fuente y objetivo 'a escala' uno debajo del otro, o si proporcionó un ejemplo 'real' para que lo ayudemos. – yamen

+0

Gracias por la captura Enigmatividad, arreglaré la salida @yamen Añadiré más detalles – fra

Respuesta

4

No estoy seguro de si esto hace exactamente lo que le gustaría, pero puede agrupar los elementos explícitamente utilizando la palabra clave group, y luego manipular los diversos IObservable s por separado antes de recombinarlos.

E.g. si tenemos definiciones de clase como

class A 
{ 
    public char Key { get; set; } 
} 

class X : A { } 
... 

y una Subject<A>

Subject<A> subject = new Subject<A>(); 

entonces podemos escribir

var buffered = 
    from a in subject 
    group a by new { Type = a.GetType(), Key = a.Key } into g 
    from buffer in g.Buffer(TimeSpan.FromMilliseconds(300)) 
    where buffer.Any() 
    select new 
    { 
     Count = buffer.Count, 
     Type = buffer.First().GetType().Name, 
     Key = buffer.First().Key 
    }; 

buffered.Do(Console.WriteLine).Subscribe(); 

podemos probar esto con los datos que nos ha facilitado:

subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100); 
subject.OnNext(new X { Key = 'b' }); 
Thread.Sleep(100); 
subject.OnNext(new X { Key = 'a' }); 
Thread.Sleep(100); 
... 
subject.OnCompleted(); 

Para ge t la salida que proporcionó:

{ Count = 2, Type = X, Key = a } 
{ Count = 1, Type = X, Key = b } 
{ Count = 1, Type = Y, Key = b } 
{ Count = 1, Type = Y, Key = c } 
{ Count = 2, Type = Z, Key = a } 
{ Count = 2, Type = Z, Key = c } 
{ Count = 1, Type = Z, Key = b } 
+0

Aunque no sea 100% preciso, asigno aquí la recompensa porque en el momento de la respuesta la pregunta no era lo suficientemente clara. De todos modos, la respuesta está bastante bien argumentada. – fra

2

No estoy seguro si esto es exactamente lo que quiere, pero parece respaldar sus casos de uso.

En primer lugar, vamos a definir la clase base a utilizar (se puede modificar fácilmente esto para satisfacer sus necesidades):

public class MyEvent 
{ 
    public string NetworkAddress { set; get; } 
    public string EventCode { set; get; } 
} 

Vamos a configurar sus dispositivos como una matriz de IObservable<MyEvent> - es posible que tenga a disposición de manera diferente, y el siguiente debería cambiar para acomodar eso, por supuesto. Estos dispositivos producirán cada uno un valor con un retraso aleatorio de entre 0,5 y 1,5 segundos.

var deviceA = new MyEvent[] { new MyEvent() {NetworkAddress = "A", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "A", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "A", EventCode = "2"} }; 

var deviceB = new MyEvent[] { new MyEvent() {NetworkAddress = "B", EventCode = "1"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "2"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "2"}, 
           new MyEvent() {NetworkAddress = "B", EventCode = "3"} }; 

var random = new Random();         

var deviceARand = deviceA.ToObservable().Select(a => Observable.Return(a).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); 
var deviceBRand = deviceB.ToObservable().Select(b => Observable.Return(b).Delay(TimeSpan.FromMilliseconds(random.Next(500,1500)))).Concat(); 

var devices = new IObservable<MyEvent>[] { deviceARand, deviceBRand }; 

Ahora vamos a tomar todas estas secuencias de dispositivos individuales, los hacen 'distinta', y fusionarlas en una sola corriente principal:

var stream = devices.Aggregate(Observable.Empty<MyEvent>(), (acc, device) => acc.DistinctUntilChanged(a => a.EventCode).Merge(device)); 

Una vez que haya que conseguir esta corriente para ser consumidos es periódicamente sólo una cuestión de amortiguar con Buffer:

stream.Buffer(TimeSpan.FromSeconds(1)).Subscribe(x => { /* code here works on a list of the filtered events per second */ }); 
0

Después de búsquedas y experimentos, que arme un código que produce el resultado que espero:

static void Main(string[] args) 
    { 
     const string Address1 = "A:2.1.1"; 
     const string Address2 = "A:2.1.2"; 
     var comparer = new EventComparer(); 
     var eventMessageA1 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 4 }; 
     var eventMessageB1 = new EventMessage { NetworkAddress = Address2, EventCode = 1, Attribute = 5 }; 
     var eventMessageA2 = new EventMessage { NetworkAddress = Address1, EventCode = 1, Attribute = 5 }; 
     var list = new[] { eventMessageA1, eventMessageA1, eventMessageB1, eventMessageA2, eventMessageA1, eventMessageA1 }; 

     var queue = new BlockingCollection<EventMessage>(); 
     Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe 
      (
       l => list.ToList().ForEach(m => 
       { 
        Console.WriteLine("Producing {0} on thread {1}", m, Thread.CurrentThread.ManagedThreadId); 
        queue.Add(m); 
       }) 
      ); 

     // subscribing 
     queue.GetConsumingEnumerable() 
      .ToObservable() 
      .Buffer(TimeSpan.FromSeconds(5)) 
      .Subscribe(e => 
       { 
        Console.WriteLine("Queue contains {0} items", queue.Count); 
        e.Distinct(comparer).ToList().ForEach(m => 
        Console.WriteLine("{0} - Consuming: {1} (queue contains {2} items)", DateTime.UtcNow, m, queue.Count)); 
       } 
      ); 

     Console.WriteLine("Type enter to exit"); 
     Console.ReadLine(); 
    } 

    public class EventComparer : IEqualityComparer<EventMessage> 
    { 
     public bool Equals(EventMessage x, EventMessage y) 
     { 
      var result = x.NetworkAddress == y.NetworkAddress && x.EventCode == y.EventCode && x.Attribute == y.Attribute; 
      return result; 
     } 

     public int GetHashCode(EventMessage obj) 
     { 
      var s = string.Concat(obj.NetworkAddress + "_" + obj.EventCode + "_" + obj.Attribute); 
      return s.GetHashCode(); 
     } 
    } 

    public class EventMessage 
    { 
     public string NetworkAddress { get; set; } 

     public byte EventCode { get; set; } 

     public byte Attribute { get; set; } 

     public override string ToString() 
     { 
      const string Format = "{0} ({1}, {2})"; 
      var s = string.Format(Format, this.NetworkAddress, this.EventCode, this.Attribute); 
      return s; 
     } 
    } 

De todos modos, al monitorear la aplicación, parece que esto provoca una pérdida de memoria. Mi pregunta es ahora:

  • ¿Qué está causando la pérdida de memoria? [consulte la actualización a continuación]
  • es esta la mejor manera de hacerlo (si pongo lo distinto en el primer observable, no obtengo los otros eventos en los siguientes almacenamientos intermedios, pero los elementos en cada almacenamiento intermedio deben estar aislados de otros)?
  • ¿cómo puedo escribir una prueba usando el planificador de prueba?

ACTUALIZACIÓN:

parece que el incremento de memoria sólo dura algunos minutos, entonces el valor es estable. Haré una prueba larga. Por supuesto, esto sería un comportamiento absolutamente aceptable.

ACTUALIZACIÓN 26/08/12:

  • como ya he mencionado en la anterior actualización, el uso de memoria aumenta solamente (y lentamente) durante algunos minutos después del inicio. Después de 8 horas de la memoria consumida era estable, con fluctuaciones normales en el rango de pocos KB)
  • este question es muy similar a la mía y la extensión de drenaje sugerido podría aplicarse también a mi problema (aún no se ha verificado)

De todos modos, creo que mi pregunta aún está abierta para las pruebas de unidad usando el planificador de prueba.

gracias Francesco