var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
var zip = a.Zip(b, (x, y) => x + "-" + y);
zip.Subscribe(Console.WriteLine);
imprime
a 0 - 5
de 1 - 6
2 - 7
...Cómo unir varias secuencias IObservable?
En su lugar, me gustaría unirse a valores idénticos
5 - 5
6 - 6
7 - 7
8 - 8
...
Este es un ejemplo simplificado del problema al fusionar cientos de secuencias asíncronas ordenadas. Es muy fácil unir dos IEnumerables, pero no pude encontrar una manera de hacer algo como esto en Rx. ¿Algunas ideas?
Más sobre las entradas y lo que estoy tratando de lograr. Básicamente, todo el sistema es una canalización en tiempo real con múltiples máquinas de estado (agregadores, búferes, filtros de suavizado, etc.) conectados por un patrón de combinación de horquilla. ¿RX es una buena opción para implementar tales cosas? Cada entrada se puede representar como
public struct DataPoint
{
public double Value;
public DateTimeOffset Timestamp;
}
Cada bit de entrada de datos se marcado con la fecha a la llegada, por lo tanto todos los eventos son, naturalmente, clasificadas por su clave de unión (timestamp). A medida que los eventos viajan a través de la tubería, se bifurcan y se unen. Las uniones deben correlacionarse por marca de tiempo y aplicarse en orden predefinido. Por ejemplo, join (a, b, c, d) => join (join (join (a, b), c), d).
Editar A continuación se muestra lo que podría ocurrir con prisa. Esperemos que haya una solución más simple basada en los operadores de Rx existentes.
static void Test()
{
var a = Observable.Range(0, 10);
var b = Observable.Range(5, 10);
//var zip = a.Zip(b, (x, y) => x + "-" + y);
//zip.Subscribe(Console.WriteLine);
var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
joined.Subscribe(Console.WriteLine);
}
static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
{
return Observable.CreateWithDisposable<string>(o =>
{
Queue<int> a = new Queue<int>();
Queue<int> b = new Queue<int>();
object gate = new object();
left.Subscribe(x =>
{
lock (gate)
{
if (a.Count == 0 || a.Peek() < x)
a.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
right.Subscribe(x =>
{
lock (gate)
{
if (b.Count == 0 || b.Peek() < x)
b.Enqueue(x);
while (a.Count != 0 && b.Count != 0)
{
if (a.Peek() == b.Peek())
{
o.OnNext(selector(a.Dequeue(), b.Dequeue()));
}
else if (a.Peek() < b.Peek())
{
a.Dequeue();
}
else
{
b.Dequeue();
}
}
}
});
return Disposable.Empty;
});
se le preguntó sobre la misma cuestión [rx foro] (http://social.msdn.microsoft.com/Forums/en-US/rx/thread/adbcd963- 0c83-4968-a1b2-1317d5e31ae5) –