2012-03-18 17 views
25

Estoy tratando de modelar una consulta Rx que no es trivial (para mí):Rx corrientes

  • En una habitación hay hombres y mujeres.
  • Entran y salen de la habitación, y mientras están en la habitación a veces cambian su ubicación.
  • Cada hombre puede mirar a una (o cero) mujer en un momento dado.
  • Cada hombre tiene las siguientes propiedades:

    class Man 
    { 
        public const int LookingAtNobody = 0; 
        public int Id { get; set; } 
        public double Location { get; set; } 
        public int LookingAt { get; set; } 
    } 
    
  • Cada mujer tiene las siguientes propiedades:

    class Woman 
    { 
        public int Id { get; set; } 
        public double Location { get; set; } 
    } 
    
  • para representar los hombres que tenemos IObservable<IObservable<Man>>, y para representar a la mujeres tenemos IObservable<IObservable<Woman>> .

¿Cómo usar Rx para generar vectores de los hombres a las mujeres que están viendo: IObservable<IObservable<Tuple<double,double>>>?

Para ayudar, aquí hay algunas unidad de pruebas para algunos casos sencillos:

public class Tests : ReactiveTest 
{ 
    [Test] 
    public void Puzzle1() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(300)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle2() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(350), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle3() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }), 
      OnCompleted<Man>(400)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
    } 
    [Test] 
    public void Puzzle4() 
    { 
     var scheduler = new TestScheduler(); 

     var m1 = scheduler.CreateHotObservable(
      OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }), 
      OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }), 
      OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }), 
      OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }), 
      OnCompleted<Man>(500)); 

     var w1 = scheduler.CreateHotObservable(
      OnNext(150, new Woman { Id = 10, Location = 10.0 }), 
      OnNext(250, new Woman { Id = 10, Location = 20.0 }), 
      OnCompleted<Woman>(350)); 
     var w2 = scheduler.CreateHotObservable(
      OnNext(155, new Woman { Id = 20, Location = 100.0 }), 
      OnNext(255, new Woman { Id = 20, Location = 200.0 }), 
      OnNext(355, new Woman { Id = 20, Location = 300.0 }), 
      OnCompleted<Woman>(455)); 

     var men = scheduler.CreateHotObservable(OnNext(50, m1)); 
     var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2)); 

     var results = runQuery(scheduler, women, men); 

     var innerResults = (from msg in results 
          where msg.Value.HasValue 
          select msg.Value.Value).ToArray(); 
     var expectedVector1 = new[] 
         { 
          OnNext(200, Tuple.Create(2.0, 10.0)), 
          OnNext(250, Tuple.Create(2.0, 20.0)), 
          OnCompleted<Tuple<double,double>>(300), 
         }; 
     var expectedVector2 = new[] 
         { 
          OnNext(300, Tuple.Create(3.0, 200.0)), 
          OnNext(355, Tuple.Create(3.0, 300.0)), 
          OnNext(400, Tuple.Create(4.0, 300.0)), 
          OnCompleted<Tuple<double,double>>(455), 
         }; 
     ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]); 
     ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]); 
    } 

    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men) 
    { 
     // assuming nested sequences are hot 
     var vectors = 
      from manDuration in men 
      join womanDuration in women on manDuration equals womanDuration 
      select from man in manDuration 
        join woman in womanDuration on manDuration equals womanDuration 
        where man.LookingAt == woman.Id 
        select Tuple.Create(man.Location, woman.Location); 

     var query = vectors.Select(vectorDuration => 
     { 
      var vectorResults = scheduler.CreateObserver<Tuple<double, double>>(); 
      vectorDuration.Subscribe(vectorResults); 
      return vectorResults.Messages; 
     }); 

     var results = scheduler.Start(() => query, 0, 0, 1000).Messages; 
     return results; 
    } 
} 

(nota: esta pregunta se cruz ha publicado en los foros Rx: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

+4

DAT IEnumerable >>>>>> – Asti

+0

Su publicación en el foro de MSDN, y la cantidad de charla que creó demuestra que esto no es bueno pregunta para un sitio de preguntas y respuestas. –

+9

"No cruzar las corrientes" - Dr. Egon Spengler –

Respuesta

1

Si he entendido correctamente , el objetivo es crear un observable de "seguir observables", donde un "seguimiento observable" comienza cuando un hombre comienza a mirar a una mujer, y termina cuando el hombre deja de mirar a la mujer. El "seguimiento observable" debe consistir en tuplas de las ubicaciones más recientes del hombre y la mujer.

La idea aquí es usar CombineLatest, que tomará dos observables, y cuando cualquiera de ellos produce un valor, el combinador se evalúa para los dos valores más recientes de los observables, lo que produce un valor en el observable combinado. Sin embargo, CombineLatest finaliza solo cuando se han completado ambos observables. En este caso, queremos completar el observable cuando se complete cualquiera de las dos fuentes. Con el fin de hacerlo, se define el siguiente método de extensión (no creo que un procedimiento de este tipo ya existe, pero puede haber una solución más fácil):

public static IObservable<TSource> 
    UntilCompleted<TSource, TWhile>(this IObservable<TSource> source, 
             IObservable<TWhile> lifetime) 
{ 
    return Observable.Create<TSource>(observer => 
    { 
    var subscription = source.Subscribe(observer); 
    var limiter = lifetime.Subscribe(next => { },() => 
    { 
     subscription.Dispose(); 
     observer.OnCompleted(); 
    }); 
    return new CompositeDisposable(subscription, limiter); 
    }); 
} 

Este método es como TakeUntil, pero en lugar de tomando hasta lifetime produce un valor, tarda hasta que lifetime completa. También podemos definir un método simple extensión que toma la primera racha que satisface un predicado:

public static IObservable<TSource> 
    Streak<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate); 
} 

Ahora, para la consulta final, combinamos todos los hombres con todas las mujeres que usan CombineLatest, y completar ese observable temprana utilizando UntilCompleted. Para obtener el "seguir observables", seleccionamos la racha en la que el hombre está mirando a la mujer. Luego, simplemente asignamos eso a una tupla de ubicaciones.

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    select manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streak(pair => pair.Man.LookingAt == pair.Woman.Id) 
    .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location)); 

Esto pasa todas las pruebas, pero no se ocupa de la situación en la que un hombre mira a la mujer 10 durante un tiempo, luego a 20 durante un tiempo, y luego a 10 durante un tiempo nuevo; solo se usa la primera raya .Observar todas las vetas, podemos utilizar el siguiente método de extensión, que devuelve un observable de rayas:

public static IObservable<IObservable<TSource>> 
    Streaks<TSource>(this IObservable<TSource> source, 
         Func<TSource, bool> predicate) 
{ 
    return Observable.Create<IObservable<TSource>>(observer => 
    { 
    ReplaySubject<TSource> subject = null; 
    bool previous = false; 
    return source.Subscribe(x => 
    { 
     bool current = predicate(x); 
     if (!previous && current) 
     { 
     subject = new ReplaySubject<TSource>(); 
     observer.OnNext(subject); 
     } 
     if (previous && !current) subject.OnCompleted(); 
     if (current) subject.OnNext(x); 
     previous = current; 
    },() => 
    { 
     if (subject != null) subject.OnCompleted(); 
     observer.OnCompleted(); 
    }); 
    }); 
} 

Mediante la suscripción solamente una vez a la corriente de la fuente, y mediante el uso de un ReplaySubject, este método funciona para el agua caliente, así como observables fríos. Ahora, para la consulta final, seleccionamos todas las vetas de la siguiente manera:

var vectors = 
    from manDuration in men 
    from womanDuration in women 
    from streak in manDuration 
    .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w }) 
    .UntilCompleted(womanDuration) 
    .UntilCompleted(manDuration) 
    .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id) 
    select streak.Select(pair => 
    Tuple.Create(pair.Man.Location, pair.Woman.Location)); 
0

No estoy seguro de entender por qué está modelando la corriente de ubicaciones, tanto de los hombres y las mujeres como un IObservable<IObservable<T>> en lugar de sólo una IObservable<T>, pero esto podría funcionar:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                      IObservable<IObservable<Woman>> womenObservable) 
{ 
    return Observable.CombineLatest(
     menObservable.Switch(), 
     womenObservable.Switch(), 
     (man, woman) => new {man, woman}) 
      .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id) 
      .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location)); 
} 

el interruptores esencialmente "interruptor" para el nuevo observable cuando se empuja, que aplana las corrientes. El where y select son bastante sencillos.

Tengo la sospecha de que estoy malinterpretando algo sobre los requisitos, pero pensé que enviaría mi respuesta por si acaso me ayuda.