2012-05-31 12 views
7

Tengo un observable que representa un flujo de precios de acciones. Si no hay observadores en mi secuencia observable, me gustaría poder desconectarme del servidor remoto que está suministrando el flujo de precios, pero no quiero hacerlo hasta que cada observador haya llamado a Dispose(). Entonces, de manera similar, cuando la primera persona llama a Subscribe, me gustaría volver a conectarme con el servidor remoto.Seguir el (número de) observadores en un Observable?

¿Hay alguna forma de averiguar cuántos observadores han llamado para suscribirse a un observable? ¿O tal vez una forma de saber cuándo los observadores están llamando Suscribirse o Descartar?

Respuesta

3

IObservable<T> es un interface que puede implementar. En el método de suscripción de la interfaz puede realizar un seguimiento de los observadores manteniendo una lista internamente.

El siguiente fragmento de código es de MSDN.

private List<IObserver<Location>> observers; 

public IDisposable Subscribe(IObserver<Location> observer) 
{ 
    if (! observers.Contains(observer)) 
     observers.Add(observer); 

    // ------- If observers.Count == 1 create connection. ------- 

    return new Unsubscriber(observers, observer); 
} 
private class Unsubscriber : IDisposable 
{ 
    private List<IObserver<Location>>_observers; 
    private IObserver<Location> _observer; 

    public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer) 
    { 
     this._observers = observers; 
     this._observer = observer; 
    } 

    public void Dispose() 
    { 
     if (_observer != null && _observers.Contains(_observer)) 
     _observers.Remove(_observer); 
     // ----------- if observers.Count == 0 close connection ----------- 
    } 
} 
+0

Sí, pensé que así era como iba a tener que hacerlo. Tenía la esperanza de poder aprovechar uno de los temas incorporados, pero parece que voy a tener que ajustar uno de ellos (lo más probable es que BehaviorSubject) para poder hacer un seguimiento de los suscriptores. –

+0

Esta solución no proporciona ningún hilo de seguridad. Necesitará un poco de trabajo antes de entrar en producción. – Enigmativity

9

Simplemente utilizaré RefCount/Publish. Siempre siento que si estoy implementando IObservable, estoy trabajando demasiado.

myColdObservable.Publish().RefCount(); 

Esto hará que su observable deje de pulsar después de que todos se hayan desconectado. He aquí una muestra:

var coldObservable = Observable 
    .Interval(TimeSpan.FromSeconds(1)) 
    .ObserveOn(Scheduler.TaskPool) 
    .Select(_ => DoSomething()); 

var refCountObs = coldObservable.Publish().RefCount(); 

CompositeDisposable d = new CompositeDisposable(); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n))); 
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n))); 

//Wait a bit for work to happen 
System.Threading.Thread.Sleep(10000); 

//Everyone unsubscribes 
d.Dispose(); 

//Observe that DoSomething is not called. 
System.Threading.Thread.Sleep(3000); 

Esto no cubre el caso en el que realmente quiere saber el número de los abonados, pero creo que esto encaja con sus necesidades de dejar de trabajar si no hay suscriptores.

+0

Este enfoque no le da la cantidad de suscriptores, pero detiene la fuente observable cuando todos los suscriptores han finalizado. Mucho mejor que implementar tu propio observable. – Enigmativity

+0

Esta es la mejor respuesta –

3

En general, no implemente IObservable; normalmente ya hay algo en Rx que puede ayudarte, ya sea directamente o por medio de la composición. Si alguna vez tiene que implementar IObservable, use Observable.Create para hacerlo, para obtener todas las garantías requeridas para el contrato del observador, etc.

En cuanto a su problema, la sugerencia de utilizar Publish y RefCount es exactamente la composición estas buscando. Si desea contar usted mismo por alguna razón, use Observable.Defer para interceptar suscripciones, posiblemente con Observable. Finalmente para interceptar las terminaciones de secuencia. O bien, envuelva la fuente con un Observable.Create, reenvíe el observador a la secuencia envuelta y ajuste el ID devuelto que se puede enviar con la lógica de conteo (utilizando Disposable.Create).

Saludos,

-Bart (Rx) del equipo

4

poco de un viejo, pero me encontré con este post ya que tenía un problema en el que yo necesitaba saber el número de suscriptores. Utilizando la sugerencia de Bart, se me ocurrió esta extensión.

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged) 
{ 
int count = 0; 

return Observable.Defer(() => 
{ 
    count = Interlocked.Increment(ref count); 
    countChanged(count); 
    return source.Finally(() => 
    { 
     count = Interlocked.Decrement(ref count); 
     countChanged(count); 
    }); 
}); 
} 
Cuestiones relacionadas