2012-04-02 8 views
7

Lo siento si el título no está muy claro, no podía pensar en nada mejor ...En Rx, ¿cómo agrupar los últimos artículos después de un período de tiempo?

que estoy recibiendo la entrada del usuario en forma de un IObservable<char>, y me gustaría transformarlo a una IObservable<char[]>, agrupando los caracteres cada vez que el usuario deja de escribir durante más de 1 segundo. Así, por ejemplo, si la entrada es la siguiente:

h 
e 
l 
l 
o 
(pause) 
w 
o 
r 
l 
d 
(pause) 
! 
(pause) 

me gustaría la salida observable a ser:

['h', 'e', 'l', 'l', 'o'] 
['w', 'o', 'r', 'l', 'd'] 
['!'] 

Sospecho que la solución es bastante simple, pero no puedo encontrar el enfoque correcto ... Traté de usar Buffer, GroupByUntil, Throttle y algunos otros, fue en vano.

¡Cualquier idea sería bienvenida!


EDIT: Tengo algo que casi funciona:

 _input.Buffer(() => _input.Delay(TimeSpan.FromSeconds(1))) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

pero necesito el retraso que se restablece cada vez que se escribe un nuevo personaje ...

Respuesta

7

Buffer y Throttle sería suficiente, si su fuente es caliente. Para calentarlo, puede usar .Publish().RefCount() para asegurarse de que solo termine con una suscripción a la fuente.

IObservable<IList<T>> BufferWithInactivity<T>(this IObservable<T> source, 
               TimeSpan dueTime) 
{ 
    if (source == null) throw new ArgumentNullException("source"); 
    //defer dueTime checking to Throttle 
    var hot = source.Publish().RefCount(); 
    return hot.Buffer(() => hot.Throttle(dueTime)); 
} 
+0

Gracias, funciona muy bien y es mucho más elegante que mi solución. En realidad, mi fuente ya está caliente (es un 'Asunto ' que alimentar desde eventos de entrada); No estoy seguro de cuál es el impacto del uso de 'Publish(). RefCount()' es ... –

+0

@ThomasLevesque Si su fuente ya está activa, creo que Publish/RefCount serán algunas capas de contenedor desperdiciadas. Si desea utilizar esto como una función de caso general, probablemente simplemente los deje a menos que se demuestre que son un problema en su aplicación.Si lo usa solo por esta vez, cambie el parámetro a 'hotSource' y deje una nota en los comentarios del documento y debe poder quitar Publish/RefCount. –

0

OK, encontrado una solución:

 Func<IObservable<char>> bufferClosingSelector = 
      () => 
      _input.Timeout(TimeSpan.FromSeconds(1)) 
        .Catch(Observable.Return('\0')) 
        .Where(i => i == '\0'); 
     _input.Buffer(bufferClosingSelector) 
       .ObserveOnDispatcher() 
       .Subscribe(OnCompleteInput); 

Básicamente, el bufferClosingSelector empuja algo cada vez que se produce un tiempo de espera, WHI ch cierra el buffer actual. Probablemente haya una manera más simple y más elegante, pero funciona ... Estoy abierto a mejores sugerencias;)

0

Escribí una extensión hace algún tiempo para hacer lo que está buscando - BufferWithInactivity.

aquí está:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source, 
    TimeSpan inactivity, 
    int maximumBufferSize) 
{ 
    return Observable.Create<IEnumerable<T>>(o => 
    { 
     var gate = new object(); 
     var buffer = new List<T>(); 
     var mutable = new SerialDisposable(); 
     var subscription = (IDisposable)null; 
     var scheduler = Scheduler.ThreadPool; 

     Action dump =() => 
     { 
      var bts = buffer.ToArray(); 
      buffer = new List<T>(); 
      if (o != null) 
      { 
       o.OnNext(bts); 
      } 
     }; 

     Action dispose =() => 
     { 
      if (subscription != null) 
      { 
       subscription.Dispose(); 
      } 
      mutable.Dispose(); 
     }; 

     Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted = 
      onAction => 
      { 
       lock (gate) 
       { 
        dispose(); 
        dump(); 
        if (o != null) 
        { 
         onAction(o); 
        } 
       } 
      }; 

     Action<Exception> onError = ex => 
      onErrorOrCompleted(x => x.OnError(ex)); 

     Action onCompleted =() => onErrorOrCompleted(x => x.OnCompleted()); 

     Action<T> onNext = t => 
     { 
      lock (gate) 
      { 
       buffer.Add(t); 
       if (buffer.Count == maximumBufferSize) 
       { 
        dump(); 
        mutable.Disposable = Disposable.Empty; 
       } 
       else 
       { 
        mutable.Disposable = scheduler.Schedule(inactivity,() => 
        { 
         lock (gate) 
         { 
          dump(); 
         } 
        }); 
       } 
      } 
     }; 

     subscription = 
      source 
       .ObserveOn(scheduler) 
       .Subscribe(onNext, onError, onCompleted); 

     return() => 
     { 
      lock (gate) 
      { 
       o = null; 
       dispose(); 
      } 
     }; 
    }); 
} 
+0

¡Gracias! Sin embargo, no es exactamente "más simple" que mi solución;) –

0

Esto se debe trabajar. No es tan conciso como su solución, ya que implementa la lógica a través de una clase en lugar de métodos de extensión, pero podría ser una mejor manera de hacerlo. En resumen: cada vez que obtenga un char, agréguelo a List y (re) inicie un temporizador que expirará en un segundo; cuando expira el temporizador, notifique a nuestros suscriptores con el List como una matriz y restablezca el estado para que esté listo para la próxima vez.

class Breaker : IObservable<char[]>, IObserver<char> 
    { 
     List<IObserver<char[]>> observers = new List<IObserver<char[]>>(); 
     List<char> currentChars; 
     DispatcherTimer t; 
     public Breaker(IObservable<char> source) 
     { 
      source.Subscribe(this); 
      t = new DispatcherTimer { Interval = new TimeSpan(0, 0, 1) }; 
      t.Tick += TimerOver; 
      currentChars = new List<char>(); 
     } 
     public IDisposable Subscribe(IObserver<char[]> observer) 
     { 
      observers.Add(observer); 
      return null; //TODO return a useful IDisposable 
     } 
     public void OnCompleted() 
     { 
      //TODO implement completion logic 
     } 
     public void OnError(Exception e) 
     { 
      //TODO implement error logic 
     } 
     public void OnNext(char value) 
     { 
      currentChars.Add(value); 
      t.Start(); 
     } 
     void TimerOver(object sender, EventArgs e) 
     { 
      char[] chars = currentChars.ToArray(); 
      foreach (var obs in observers) 
       obs.OnNext(chars); 
      currentChars.Clear(); 
      t.Stop(); 
     } 
    } 
Cuestiones relacionadas