2011-02-06 10 views
7
 var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     var zip = a.Zip(b, (x, y) => x + "-" + y); 
     zip.Subscribe(Console.WriteLine); 

imprime
a 0 - 5
de 1 - 6
2 - 7
...Cómo unir varias secuencias IObservable?

En su lugar, me gustaría unirse a valores idénticos
5 - 5
6 - 6
7 - 7
8 - 8
...

Este es un ejemplo simplificado del problema al fusionar cientos de secuencias asíncronas ordenadas. Es muy fácil unir dos IEnumerables, pero no pude encontrar una manera de hacer algo como esto en Rx. ¿Algunas ideas?

Más sobre las entradas y lo que estoy tratando de lograr. Básicamente, todo el sistema es una canalización en tiempo real con múltiples máquinas de estado (agregadores, búferes, filtros de suavizado, etc.) conectados por un patrón de combinación de horquilla. ¿RX es una buena opción para implementar tales cosas? Cada entrada se puede representar como

public struct DataPoint 
{ 
    public double Value; 
    public DateTimeOffset Timestamp; 
} 

Cada bit de entrada de datos se marcado con la fecha a la llegada, por lo tanto todos los eventos son, naturalmente, clasificadas por su clave de unión (timestamp). A medida que los eventos viajan a través de la tubería, se bifurcan y se unen. Las uniones deben correlacionarse por marca de tiempo y aplicarse en orden predefinido. Por ejemplo, join (a, b, c, d) => join (join (join (a, b), c), d).

Editar A continuación se muestra lo que podría ocurrir con prisa. Esperemos que haya una solución más simple basada en los operadores de Rx existentes.

static void Test() 
    { 
     var a = Observable.Range(0, 10); 
     var b = Observable.Range(5, 10); 
     //var zip = a.Zip(b, (x, y) => x + "-" + y); 
     //zip.Subscribe(Console.WriteLine); 

     var joined = MergeJoin(a,b, (x,y) => x + "-" + y); 
     joined.Subscribe(Console.WriteLine); 
    } 

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
    { 
     return Observable.CreateWithDisposable<string>(o => 
      { 
       Queue<int> a = new Queue<int>(); 
       Queue<int> b = new Queue<int>(); 
       object gate = new object(); 

       left.Subscribe(x => 
        { 
         lock (gate) 
         { 
          if (a.Count == 0 || a.Peek() < x) 
           a.Enqueue(x); 

          while (a.Count != 0 && b.Count != 0) 
          { 
           if (a.Peek() == b.Peek()) 
           { 
            o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
           } 
           else if (a.Peek() < b.Peek()) 
           { 
            a.Dequeue(); 
           } 
           else 
           { 
            b.Dequeue(); 
           } 
          } 
         } 
        }); 

       right.Subscribe(x => 
       { 
        lock (gate) 
        { 
         if (b.Count == 0 || b.Peek() < x) 
          b.Enqueue(x); 

         while (a.Count != 0 && b.Count != 0) 
         { 
          if (a.Peek() == b.Peek()) 
          { 
           o.OnNext(selector(a.Dequeue(), b.Dequeue())); 
          } 
          else if (a.Peek() < b.Peek()) 
          { 
           a.Dequeue(); 
          } 
          else 
          { 
           b.Dequeue(); 
          } 
         } 
        } 
       }); 

       return Disposable.Empty; 
      }); 
+0

se le preguntó sobre la misma cuestión [rx foro] (http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –

Respuesta

1

Sinceramente, no puedo pensar de una solución basada en operadores existentes que funciona para fuentes calientes de orden desconocido (es decir, xs before ys frente a ys before xs). Su solución parece estar bien (bueno, si funciona), pero me gustaría hacer algunos cambios si fuera mi código:

  • cancelación de Apoyo adecuadamente usando MutableDisposable y CompositeDisposable
  • llamada OnError de excepciones lanzadas desde el selector (lo que es más consistente con otros operadores)
  • considerar el apoyo a la terminación si es posible que una fuente para completar antes que el otro

el código siguiente se ha probado con la entrada de dos campos, el s AME entradas volteado, así como con Empty<int> + Never<int>:

public static IObservable<string> MergeJoin(
    IObservable<int> left, IObservable<int> right, Func<int, int, string> selector) 
{ 
    return Observable.CreateWithDisposable<string>(o => 
    { 
     Queue<int> a = new Queue<int>(); 
     Queue<int> b = new Queue<int>(); 
     object gate = new object(); 

     bool leftComplete = false; 
     bool rightComplete = false; 

     MutableDisposable leftSubscription = new MutableDisposable(); 
     MutableDisposable rightSubscription = new MutableDisposable(); 

     Action tryDequeue =() => 
     { 
      lock (gate) 
      { 
       while (a.Count != 0 && b.Count != 0) 
       { 
        if (a.Peek() == b.Peek()) 
        { 
         string value = null; 

         try 
         { 
          value = selector(a.Dequeue(), b.Dequeue()); 
         } 
         catch (Exception ex) 
         { 
          o.OnError(ex); 
          return; 
         } 

         o.OnNext(value); 
        } 
        else if (a.Peek() < b.Peek()) 
        { 
         a.Dequeue(); 
        } 
        else 
        { 
         b.Dequeue(); 
        } 
       } 
      } 
     }; 

     leftSubscription.Disposable = left.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (a.Count == 0 || a.Peek() < x) 
        a.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      leftComplete = true; 

      if (a.Count == 0 || rightComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     rightSubscription.Disposable = right.Subscribe(x => 
     { 
      lock (gate) 
      { 
       if (b.Count == 0 || b.Peek() < x) 
        b.Enqueue(x); 

       tryDequeue(); 

       if (rightComplete && b.Count == 0) 
       { 
        o.OnCompleted(); 
       } 
      } 
     },() => 
     { 
      rightComplete = true; 

      if (b.Count == 0 || leftComplete) 
      { 
       o.OnCompleted(); 
      } 
     }); 

     return new CompositeDisposable(leftSubscription, rightSubscription); 
    }); 
} 
3

GroupBy puede hacer lo que necesita. Parece que no tienes límites de tiempo cuando los artículos se "unen", solo necesitas elementos similares para estar juntos de alguna manera.

Observable.Merge(Observable.Range(1, 10), Observable.Range(5, 15)) 
.GroupBy(k => k) 
.Subscribe(go => go.Count().Where(cnt => cnt > 1) 
          .Subscribe(cnt => 
        Console.WriteLine("Key {0} has {1} matches", go.Key, cnt))); 

Dos cosas a tener en cuenta sobre lo anterior, Merge tiene las siguientes sobrecargas, por lo que el su req tener cientos de corrientes unidas no presentará un problema:

Merge<TSource>(params IObservable<TSource>[] sources); 
Merge<TSource>(this IEnumerable<IObservable<TSource>> sources); 
Merge<TSource>(this IObservable<IObservable<TSource>> source); 

Por otra parte, los rendimientos GroupByIObservable<IGroupedObservable<TKey, TSource>>, lo que significa que puede reaccionar a cada grupo y a cada nuevo miembro de cada grupo a medida que ingresen, sin necesidad de esperar hasta que todo esté completo.

+0

El único problema es que necesito poder unir los valores en orden. Sin embargo, puede resolverse si en lugar de int paso las tuplas de valor de índice. –

+0

¿Qué quiere decir con "en orden"? –

+0

Tenga en cuenta que al usar 'Merge' +' Count', no obtendrá ninguna coincidencia hasta que ambas secuencias de origen finalicen. Esto está bien para el ejemplo 'Rango', pero si sus fuentes son calientes/sin fin, la salida puede no ser la esperada. –

1

¿Qué le parece usar el nuevo operador de unión en v.2838.

var a = Observable.Range(1, 10); 
var b = Observable.Range(5, 10); 

var joinedStream = a.Join(b, _ => Observable.Never<Unit>(), _ => Observable.Never<Unit>(), 
    (aOutput, bOutput) => new Tuple<int, int>(aOutput, bOutput)) 
    .Where(tupple => tupple.Item1 == tupple.Item2); 

joinedStream.Subscribe(output => Trace.WriteLine(output)); 

Este es mi primer vistazo a Join y no estoy seguro de si sería aconsejable utilizar el operador Never como este. Cuando se trata de un gran volumen de entradas, ya que generaría una gran cantidad de operaciones, se revierten más datos. Creo que se podría trabajar para cerrar las ventanas a medida que se hacen los ajustes y hacer que la solución sea más eficiente. Dicho esto, el ejemplo anterior funciona según tu pregunta.

Para el registro, creo que la respuesta de Scott es probablemente el camino a seguir en este caso. Solo estoy presentando esto como una alternativa potencial.

+0

+1 para la solución con unirse. Pasé una hora ayer y no pude hacer que funcione. Comparto tus preocupaciones sobre el rendimiento. Además, el código resultante es mucho más críptico y difícil de seguir en comparación con la simple unión LINQ. Estoy empezando a pensar que Rx no es una buena solución para este tipo de problemas. –

+0

@Serger - Estoy seguro de que esto podría hacerse más eficiente emitiendo valores de duración a medida que se realizan las coincidencias (es decir, reemplazando el Observable.Never con algo un poco más inteligente). Todo dependerá de para qué sirve la regla cuando sea seguro completar la duración. –

2

Esta respuesta se copia de la Rx forums, sólo para que se archivará en aquí también:

var xs = Observable.Range(1, 10); 
var ys = Observable.Range(5, 10); 

var joined = from x in xs 
    from y in ys 
    where x == y 
    select x + "-" + y; 

o sin el uso de expresiones de consulta:

var joined = 
    xs.SelectMany(x => ys, (x, y) => new {x, y}) 
    .Where(t => t.x == t.y) 
    .Select(t => t.x + "-" + t.y); 
+2

El único problema con esta solución es que requiere que 'ys' esté caliente (o' Multicast') y no admite el escenario donde el valor 'ys' aparece antes que el valor' xs'. –

Cuestiones relacionadas