Quiero usar Reactive Extensions para transformar algunos mensajes y transmitirlos después de un pequeño retraso.Demora y deduplicación con Reactive Extensions (Rx)
Los mensajes tienen el siguiente aspecto:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
La salida se ve algo como esto:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
Hay un par de requisitos:
- La duración de la demora depende del contenido del mensaje.
- Cada mensaje tiene un ID de grupo
- Si aparece un mensaje más reciente con el mismo ID de grupo que un mensaje retrasado esperando la transmisión, el primer mensaje se debe eliminar y solo el segundo se transmitirá después de un nuevo período de retraso.
Dado un observable <InMsg> y una función Enviar:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
entiendo que puedo usar Seleccionar para realizar la transformación.
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- ¿Cómo puedo aplicar un mensaje especifican retrasar? (Tenga en cuenta que esto podría/debería dar como resultado una entrega de mensajes fuera de servicio.)
- ¿Cómo puedo deducir los mensajes con el mismo ID de grupo?
- ¿Resuelve Rx este problema?
- ¿Hay alguna otra forma de solucionar esto?
He tenido una jugada con esto y no hace exactamente lo que esperaba. La suscripción recibe un "System.Collections.Generic.AnonymousObservable'1 [OutMsg]" – chillitom
Parece que no está llamando a 'Switch'.Si "pasa el mouse sobre" 'Seleccionar' en Visual Studio debería decirle que devuelve un' IObservable '. Si devuelve 'IObservable >', no está llamando a Switch –