2011-01-19 11 views
7

Quiero usar Reactive Extensions para transformar algunos mensajes y transmitirlos después de un pequeño retraso.Demora y deduplicación con Reactive Extensions (Rx)

Los mensajes tienen el siguiente aspecto:

class InMsg 
{ 
    int GroupId { get; set; } 
    int Delay { get; set; } 
    string Content { get; set; } 
} 

La salida se ve algo como esto:

class OutMsg 
{ 
    int GroupId { get; set; } 
    string Content { get; set; } 
    OutMsg(InMsg in) 
    { 
     GroupId = in.GroupId; 
     Content = Transform(in.Content); // function omitted 
    } 
} 

Hay un par de requisitos:

  • La duración de la demora depende del contenido del mensaje.
  • Cada mensaje tiene un ID de grupo
  • Si aparece un mensaje más reciente con el mismo ID de grupo que un mensaje retrasado esperando la transmisión, el primer mensaje se debe eliminar y solo el segundo se transmitirá después de un nuevo período de retraso.

Dado un observable <InMsg> y una función Enviar:

IObservable<InMsg> inMsgs = ...; 

void Send(OutMsg o) 
{ 
    ... // publishes transformed messages 
} 

entiendo que puedo usar Seleccionar para realizar la transformación.

void SetUp() 
{ 
    inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); 
} 
  • ¿Cómo puedo aplicar un mensaje especifican retrasar? (Tenga en cuenta que esto podría/debería dar como resultado una entrega de mensajes fuera de servicio.)
  • ¿Cómo puedo deducir los mensajes con el mismo ID de grupo?
  • ¿Resuelve Rx este problema?
  • ¿Hay alguna otra forma de solucionar esto?

Respuesta

7

Puede utilizar GroupBy hacer una IGroupedObservable, Delay para retrasar la salida, y Switch para asegurarse de que los valores nuevos reemplazan a los valores anteriores de su grupo:

IObservable<InMsg> inMessages; 

inMessages 
    .GroupBy(msg => msg.GroupId) 
    .Select(group => 
     { 
      return group.Select(groupMsg => 
       { 
        TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); 
        OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here 

        return Observable.Return(outMsg).Delay(delay); 
       }) 
       .Switch(); 
     }) 
     .Subscribe(outMsg => Console.Write("OutMsg received")); 

Una nota sobre la aplicación: si un valor agrupados llegó después de se envía el mensaje (es decir. después del retardo), se iniciará un nuevo retardo

+0

He tenido una jugada con esto y no hace exactamente lo que esperaba. La suscripción recibe un "System.Collections.Generic.AnonymousObservable'1 [OutMsg]" – chillitom

+0

Parece que no está llamando a 'Switch'.Si "pasa el mouse sobre" 'Seleccionar' en Visual Studio debería decirle que devuelve un' IObservable '. Si devuelve 'IObservable >', no está llamando a Switch –

0

el marco Rx resuelve el retardo utilizando the Delay método de extensión. La cola con deduplicación podría resolverse aplicando un ordenamiento LINQ regular después del Delay, y luego haciendo un DistinctUntilChanged.

Actualización: Yo admint, el enfoque de retardo aquí no funcionará solo. De alguna manera, necesitas poner en cola los mensajes entrantes dentro de la demora que estás buscando. Esto se logra mediante el método de extensión BufferWithTime. Este método devolverá listas de mensajes, que luego puede pelar por duplicados antes de publicarlos en el siguiente observador en línea.

Cuestiones relacionadas