2011-09-29 11 views
21

estoy usando extensiones de reactivos para cotejar los datos en memorias intermedias de 100 ms:¿Las extensiones reactivas son compatibles con los búfers de laminación?

this.subscription = this.dataService 
    .Where(x => !string.Equals("FOO", x.Key.Source)) 
    .Buffer(TimeSpan.FromMilliseconds(100)) 
    .ObserveOn(this.dispatcherService) 
    .Where(x => x.Count != 0) 
    .Subscribe(this.OnBufferReceived); 

Esto funciona bien. Sin embargo, quiero un comportamiento ligeramente diferente al proporcionado por la operación Buffer. Esencialmente, quiero restablecer el temporizador si se recibe otro elemento de datos. Solo cuando no se hayan recibido datos para los 100 ms completos, quiero manejarlos. Esto abre la posibilidad de nunca manejar los datos, por lo que también debería poder especificar un recuento máximo. Me imagino algo en la línea de:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000) 

que he tenido una mirada alrededor y no he podido encontrar nada parecido en Rx? ¿Alguien puede confirmar/negar esto?

+0

Estoy seguro de que vi este comportamiento en uno de los videos del tutorial en Rx, pero me temo que no puedo recordar qué o dónde exactamente. :( – Chris

+0

Ah, acelerador (http://msdn.microsoft.com/en-us/library/hh229298%28v=vs.103%29.aspx) es en lo que estaba pensando pero no creo que haga qué lo que quiere por sí solo. No estoy seguro de si podría haber alguna forma de combinarlo para hacer lo que se quiere ... – Chris

Respuesta

12

Escribí una extensión para hacer la mayor parte de lo que está buscando - BufferWithInactivity.

Aquí está:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

+1 gracias. ¿Escribió esto solo para esta pregunta o para usted? ¿Se ha utilizado en el código de producción? –

+0

@KentBoogaart - Lo escribí hace meses, pero aún no está en el código de producción. Sigue siendo un WIP. – Enigmativity

+0

+1 Buen uso de SerialDisposable para el acelerador ... –

0

creo que esto puede ser implementado en la parte superior del Método de regulación como se muestra a continuación:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max) 
     { 
      return Observable.CreateWithDisposable<IList<T>>(cl => 
      { 
       var acc = new List<T>(); 
       return obs.Buffer(span) 
         .Subscribe(next => 
         { 
          if (next.Count == 0) //no activity in time span 
          { 
           cl.OnNext(acc); 
           acc.Clear(); 
          } 
          else 
          { 
           acc.AddRange(next); 
           if (acc.Count >= max) //max items collected 
           { 
            cl.OnNext(acc); 
            acc.Clear(); 
           } 
          } 
         }, err => cl.OnError(err),() => { cl.OnNext(acc); cl.OnCompleted(); }); 
      }); 
     } 

NOTA: Yo no lo he probado, pero espero que sea te da la idea.

12

Esto es posible combinando los métodos integrados Window y Throttle de Observable. En primer lugar, vamos a resolver el problema más sencillo cuando ignoramos la condición de máxima recuento:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay) 
{ 
    var closes = stream.Throttle(delay); 
    return stream.Window(() => closes).SelectMany(window => window.ToList()); 
} 

El potente Window method hicieron el trabajo pesado. Ahora es bastante fácil ver cómo agregar un recuento máximo:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null) 
{ 
    var closes = stream.Throttle(delay); 
    if (max != null) 
    { 
     var overflows = stream.Where((x,index) => index+1>=max); 
     closes = closes.Merge(overflows); 
    } 
    return stream.Window(() => closes).SelectMany(window => window.ToList()); 
} 

Voy a escribir una publicación explicando esto en mi blog. https://gist.github.com/2244036

documentación para el método de la ventana:

+0

Con el escenario BufferUntilInactive anterior: si el suscriptor es lento que el productor, puede ver un escenario donde el siguiente conjunto de elementos con ventana se almacenará en el búfer y no se enviará al suscriptor a menos que se genere un elemento ... –

+0

adjunto una muestra http://snipt.org/Bhao0. En Visual Studio (1) abra la ventana de salida (2) Verifique el botón de suspensión (3) Haga clic en el botón (4) espere a que se imprima "Haga clic ahora" en la consola. (5) presione el botón tres veces, verá que se pierden esos tres clics. –

3

con extensiones 2.0 Rx, su puede responder a ambas necesidades con un nuevo buffer sobrecargue la aceptación de un tiempo de espera y una talla:

this.subscription = this.dataService 
    .Where(x => !string.Equals("FOO", x.Key.Source)) 
    .Buffer(TimeSpan.FromMilliseconds(100), 1) 
    .ObserveOn(this.dispatcherService) 
    .Where(x => x.Count != 0) 
    .Subscribe(this.OnBufferReceived); 

Consulte https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx para la documentación.

+0

¿Pero esto no tendrá una ventana deslizante, con el tipo de comportamiento antirrebote que se solicitó? – Cocowalla

+0

@cocowalla Releí la pregunta original y el código que proporcioné cumple con todos los requisitos. Lo he usado en el código de producción con gran éxito. –

+0

Disculpe, quise decir específicamente el comportamiento antirrebote: "Quiero reiniciar el temporizador si se recibe otro elemento de datos" - ¿No veo que su código hace esto? AFAICS, su código siempre empujará el búfer al suscriptor cada 100 ms (siempre que no esté vacío) – Cocowalla

Cuestiones relacionadas