Lo que trato de hacer es almacenar en el búfer los eventos entrantes de algunos IObservable (vienen en ráfagas) y soltarlos más, pero uno por uno, en intervalos pares. De esta manera:Una forma de enviar eventos almacenados temporalmente a intervalos regulares
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
Ya que soy bastante nuevo en Rx, no estoy seguro de si ya hay un tema o un operador que hace precisamente esto. ¿Tal vez se puede hacer por composición?
actualización:
Gracias a Richard Szalay para señalar la drenaje operador, he encontrado otra example by James Miles del uso del operador de drenaje. Así es como me las arreglé para conseguir que funcione en un WPF aplicación:
.Drain(x => {
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher);
}).Subscribe();
que había un poco de diversión, debido a la omisión del parámetro planificador hace que el cierre de la aplicación en modo de depuración sin excepción a aparecer (I tenga que aprender para tratar con excepciones en Rx). El método de proceso modifica el estado de la interfaz de usuario directamente, pero supongo que es bastante simple hacer un IObservable (utilizando un objeto IS?).
actualización:
Mientras tanto, yo he estado experimentando con ISubject, la clase de abajo hace lo que quería - que deja escapar tamponada Ts en el momento oportuno:
public class StepSubject<T> : ISubject<T>
{
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
{
this.interval = interval;
this.scheduler = scheduler;
}
void Step()
{
T next;
lock (queue)
{
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
}
if (!idle)
{
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
}
}
public void OnNext(T value)
{
lock (queue)
queue.Enqueue(value);
if (idle)
cancel.Disposable = scheduler.Schedule(Step);
}
public IDisposable Subscribe(IObserver<T> observer)
{
subscriber = observer;
return cancel;
}
}
Esta ingenua la implementación se elimina de OnCompleted y OnError para mayor claridad, también solo se permite la suscripción única.
Su tema no es multi-hilo. Está comprobando si su cola está vacía dentro de un bloqueo, pero luego está quitando de la cerradura. –
¡Gracias, lo arreglé! – majocha