2011-07-07 8 views
7

Necesito implementar una versión de CombineLatest (la llamaré WithLatest aquí) que llama al selector para cada elemento a la izquierda y el último elemento a la derecha. No debería presionar para que los artículos de la derecha cambien solamente.CombineLatest, pero solo presione para la izquierda

Creo que si esto está construido Observable.Create o una combinación de extensiones existentes no es particularmente importante; Voy a hacer de esto un método de extensión "en caja" de cualquier manera.

Ejemplo

var left = new Subject<int>(); 
var right = new Subject<int>(); 

left.WithLatest(right, (l,r) => l + " " + r).Dump(); 

left.OnNext(1); // <1> 
left.OnNext(2); // <2> 
right.OnNext(1); // <3> 
right.OnNext(2); // <4> 
left.OnNext(3); // <5> 

debe ceder

2 1 
3 2 

Editar: La lógica de mi ejemplo va:

  1. izquierda se puebla con 1. Derecho está vacía, no hay valores empujados
  2. Izquierda se actualiza con 2 (olvida el valor anterior). La derecha todavía está vacía, por lo que no se empuja nada.
  3. La derecha se rellena con 1, por lo que se pulsa Izquierda = 2 (el último valor), Derecha = 1. Hasta este punto, no hay diferencia entre WithLatest y CombineLatest
  4. Se actualiza el derecho - no se empuja nada. Esto es lo que es diferente
  5. La izquierda se actualiza con 3, por lo que se empuja a la izquierda = 3, a la derecha = 2 (el último valor).

Se ha sugerido que trato:

var lr = right.ObserveOn(Scheduler.TaskPool).Latest(); 
left.Select(l => l + " " + lr.First()).Dump(); 

pero esto bloquea en el subproceso actual para mi prueba.

+0

¿Desea hacer esto solo con combinators existentes, o es una implementación usando 'Create()' una opción? –

+1

Su ejemplo no coincide con lo que esperaría de su descripción. El primer elemento devuelto en su ejemplo parece ser desencadenado por un cambio en la derecha observable. ¿Debería activarse 'WithLatest' en la primera derecha si ya hay artículos de la izquierda? –

+1

@Gideon Engelberth: De acuerdo. Eso me dejó perplejo también. De acuerdo con la descripción, el algoritmo debería producir "3 2" solamente. – 3dGrabber

Respuesta

0

Aquí es la manera hacky mediante Crear - en realidad no construirlo, mea culpa, si no en realidad el trabajo :)

public static IObservable<TRet> WithLatest<TLeft, TRight, TRet>(
     this IObservable<TLeft> lhs, 
     IObservable<TRight> rhs, 
     Func<TLeft, TRight, TRet> sel) 
{ 
    return Observable.Create<TRet>(subj => { 
     bool rhsSet = false; 
     bool deaded = false; 
     var latestRhs = default(TRight); 

     Action onDeaded = null; 

     var rhsDisp = rhs.Subscribe(
      x => { latestRhs = x; rhsSet = true; }, 
      ex => { subj.OnError(ex); onDeaded(); }); 

     var lhsDisp = lhs 
      .Where(_ => deaded == false && rhsSet == true) 
      .Subscribe(
       x => subj.OnNext(sel(x, latestRhs)), 
       ex => { subj.OnError(ex); onDeaded(); }, 
       () => { subj.OnCompleted(); onDeaded(); }); 

     onDeaded =() => { 
      deaded = true; 
      if (lhsDisp != null) { 
       lhsDisp.Dispose(); 
       lhsDisp = null; 
      } 
      if (rhsDisp != null) { 
       rhsDisp.Dispose(); 
       rhsDisp = null; 
      } 
     }; 

     return onDeaded; 
    }); 
} 
+0

Obtengo "1 0", "2 0" para mi ejemplo. –

+0

Vaya, mi cláusula where fue mala, pruébela ahora –

+0

Ahora es solo "3 2" –

4

También tenían la misma necesidad de un CombineLatest la que "empuja sólo para la izquierda".

Hice la solución de una "sobrecarga" de Observable.Sample, porque eso es lo que el método no hace:
It muestras una source (derecha) con un sampler (izquierda), con la capacidad adicional de proporcionar un resultSelector (como en CombineLatest)

public static IObservable<TResult> Sample<TSource, TSample, TResult>(
    this IObservable<TSource> source, 
    IObservable<TSample> sampler, 
    Func<TSource, TSample, TResult> resultSelector) 
{ 
    var multiSampler = sampler.Publish().RefCount(); 
    return source.CombineLatest(multiSampler, resultSelector).Sample(multiSampler); 
} 
+0

Uso para el ejemplo anterior: 'right.Sample (izquierda, (r, l) => l +" "+ r) .Dump();' Produce "3 2" que es IMO correcto – 3dGrabber

1

Sobre la base de la solución escogida por el puesto autor Creo que hay una solución aún más simple que utiliza DistinctUntilChanged:

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) { 
     return leftSource 
      .Select<TLeft, Tuple<TLeft, int>>(Tuple.Create<TLeft, int>) 
      .CombineLatest(rightSource, 
       (l, r) => new { Index = l.Item2, Left = l.Item1, Right = r }) 
      .DistinctUntilChanged(x => x.Index) 
      .Select(x => selector(x.Left, x.Right)); 
    } 

o incluso

public static IObservable<TResult> CombineLatestOnLeft<TLeft, TRight, TResult>(this IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) { 
     return leftSource 
      .CombineLatest(rightSource, 
       (l, r) => new { Left = l, Right = r }) 
      .DistinctUntilChanged(x => x.Left) 
      .Select(x => selector(x.Left, x.Right)); 
    } 

si sólo se preocupan por los valores distintos de leftSource

5

Puede hacerlo utilizando operato existente rs.

Func<int, int, string> selector = (l, r) => l + " " + r; 

var query = right.Publish(rs => left.Zip(rs.MostRecent(0), selector).SkipUntil(rs)); 
  • Publish asegura que sólo he suscribimos right vez y compartir la suscripción entre todos los abonados a rs.

  • MostRecent convierte un IObservable<T> en un IEnumerable<T> que siempre produce el valor más recientemente emitida desde la fuente observable.

  • Zip entre IObservable<T> y IEnumerable<U> emite un valor cada vez que el observable emite un valor.

  • SkipUntil omite los pares (l, r) que se producen antes de que right alguna vez emite un valor.

0

hice un operador de RX para el proyecto actual que hace esto.

aquí está mi solución:

public static IObservable<Tuple<TSource, TTarget>> JoinLeftSoft<TSource, TTarget>(
     this IObservable<TSource> source, IObservable<TTarget> right) 
    { 
     return source 
      .Select(x => new Tuple<object, TSource>(new object(), x)) 
      .CombineLatest(right, (l, r) => new Tuple<object, TSource, TTarget>(l.Item1, l.Item2, r)) 
      .DistinctUntilChanged(t => t.Item1) 
      .Select(t => new Tuple<TSource, TTarget>(t.Item2, t.Item3)); 
    } 
0

En última System.Reactive, podemos utilizar WithLatestFrom método de extensión.

left.WithLatestFrom(right, (l, r) => l + " " + r).Dump(); 

El resultado estaría debajo correctamente.

3 2 
Cuestiones relacionadas