6

Crearé un observable (a través de una variedad de medios) y se lo devolveré a las partes interesadas, pero cuando hayan terminado de escuchar, me gustaría eliminar lo observable para que no continúe consumiendo recursos. Otra forma de pensar que es crear temas en un sistema secundario de pub. Cuando ya nadie está suscrito a un tema, ya no desea mantener el tema y su filtrado.¿Cómo puedo crear un observable Rx que deje de publicar eventos cuando el último observador cancela la suscripción?

Respuesta

10

Rx ya cuenta con un operador para que se adapte a sus necesidades - y dos en realidad - Publish & RefCount.

Aquí es cómo usarlos:

IObservable xs = ... 

var rxs = xs.Publish().RefCount(); 

var sub1 = rxs.Subscribe(x => { }); 
var sub2 = rxs.Subscribe(x => { }); 

//later 
sub1.Dispose(); 

//later 
sub2.Dispose(); 

//The underlying subscription to `xs` is now disposed of. 

simple.

1

Si he entendido su pregunta, quiere crear la observable de modo que cuando todos los suscriptores hayan eliminado su suscripción, es decir, no haya más suscriptores, entonces quiera ejecutar una función de limpieza que detendrá los valores observables de producción . Si esto es lo que quieres, entonces se puede hacer algo, como a continuación: Ejemplo de

//Wrap a disposable 
public class WrapDisposable : IDisposable 
    { 
     IDisposable disp; 
     Action act; 
     public WrapDisposable(IDisposable _disp, Action _act) 
     { 
      disp = _disp; 
      act = _act; 
     } 
     void IDisposable.Dispose() 
     { 
      act(); 
      disp.Dispose(); 
     } 
    } 

    //Observable that we want to clean up after all subs are done 
    public static IObservable<long> GenerateObs(out Action cleanup) 
    { 
     cleanup =() => 
     { 
      Console.WriteLine("All subscribers are done. Do clean up"); 
     }; 
     return Observable.Interval(TimeSpan.FromSeconds(1)); 
    } 
    //Wrap the observable 
    public static IObservable<T> WrapToClean<T>(IObservable<T> obs, Action onAllDone) 
    { 
     int count = 0; 
     return Observable.CreateWithDisposable<T>(ob => 
     { 
      var disp = obs.Subscribe(ob); 
      Interlocked.Increment(ref count); 
      return new WrapDisposable(disp,() => 
      { 
       if (Interlocked.Decrement(ref count) == 0) 
       { 
        onAllDone();             
       } 
      }); 
     }); 
    } 

// Uso:

Action cleanup; 
var obs = GenerateObs(out cleanup); 
var newObs = WrapToClean(obs, cleanup); 
newObs.Take(6).Subscribe(Console.WriteLine); 
newObs.Take(5).Subscribe(Console.WriteLine); 
Cuestiones relacionadas