2012-03-05 18 views
7

He estado buscando ejemplos sobre cómo usar Observable.Buffer en rx pero no puedo encontrar nada más sustancial que el material de buffer de tiempo de la placa de la caldera.¿Es posible Observable.Buffer en otra cosa que no sea el tiempo

Parece que hay una sobrecarga para especificar un "bufferClosingSelector" pero no puedo entenderlo.

Lo que intento hacer es crear una secuencia que almacena temporalmente por tiempo o por "acumulación". Considere un flujo de solicitud donde cada solicitud tiene algún tipo de peso y no quiero procesar más de x peso acumulado a la vez, o si no se ha acumulado lo suficiente, solo déme lo que ha venido en el último intervalo de tiempo (Buffer regular funcionalidad)

Respuesta

13

bufferClosingSelector es una función llamada cada vez para obtener un Observable que producirá un valor cuando se espera que el búfer se cierre.

Por ejemplo,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) obras como el Buffer(time) sobrecarga regular.

En lo que desea ponderar una secuencia, puede aplicar un Scan sobre la secuencia y luego decidir sobre su condición de agregación.

Por ejemplo, source.Scan((a,c) => a + c).SkipWhile(a => a < 100) le da una secuencia que produce un valor cuando la secuencia de origen ha añadido hasta más de 100.

Puede utilizar Amb de raza estas dos condiciones de cierre para ver que reacciona en primer lugar:

 .Buffer(() => Observable.Amb 
        (
          Observable.Timer(TimeSpan.FromSeconds(1)), 
          source.Scan((a,c) => a + c).SkipWhile(a => a < 100) 
        ) 
       ) 

Puede usar cualquier serie de combinadores que produzca algún valor para que el búfer se cierre en ese punto.

Nota: El valor otorgado al selector de cierre no importa; es la notificación lo que importa. Por lo tanto, para combinar fuentes de diferentes tipos con Amb simplemente cámbielo a System.Reactive.Unit.

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit()) 
+0

Sólo una nota rápida, no parece amb a trabajar cuando la fuente es un observable de tipo distinto a continuación, a largo – Dmitry

+0

@Dmitry Estaba dando la idea básica. Lo he editado para incluir un ejemplo de diferentes tipos. – Asti

+0

¿Es posible acceder al valor de cierre del buffer desde el observador? P.ej. el buffer de timestamp se usa para cerrar. – liang

Cuestiones relacionadas