2009-11-20 12 views
37

Las extensiones reactivas vienen con una gran cantidad de métodos auxiliares para convertir eventos existentes y operaciones asincrónicas en observables, pero ¿cómo implementaría un IObservable <T> desde cero?Implementando IObservable <T> desde cero

IEnumerable tiene la encantadora palabra clave yield para que sea muy simple de implementar.

¿Cuál es la forma correcta de implementar IObservable <T>?

¿Debo preocuparme por la seguridad de los hilos?

Sé que hay soporte para recibir una llamada de vuelta en un contexto de sincronización específico, pero ¿es esto algo que yo como autor IObservable <T> debo preocuparme o esto de alguna manera está incorporado?

actualización:

Aquí está mi C# versión de la solución de Brian F #

using System; 
using System.Linq; 
using Microsoft.FSharp.Collections; 

namespace Jesperll 
{ 
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs 
    { 
     private FSharpMap<int, IObserver<T>> subscribers = 
       FSharpMap<int, IObserver<T>>.Empty; 
     private readonly object thisLock = new object(); 
     private int key; 
     private bool isDisposed; 

     public void Dispose() 
     { 
      Dispose(true); 
     } 

     protected virtual void Dispose(bool disposing) 
     { 
      if (disposing && !isDisposed) 
      { 
       OnCompleted(); 
       isDisposed = true; 
      } 
     } 

     protected void OnNext(T value) 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnNext(value); 
      } 
     } 

     protected void OnError(Exception exception) 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      if (exception == null) 
      { 
       throw new ArgumentNullException("exception"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnError(exception); 
      } 
     } 

     protected void OnCompleted() 
     { 
      if (isDisposed) 
      { 
       throw new ObjectDisposedException("Observable<T>"); 
      } 

      foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value)) 
      { 
       observer.OnCompleted(); 
      } 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      if (observer == null) 
      { 
       throw new ArgumentNullException("observer"); 
      } 

      lock (thisLock) 
      { 
       int k = key++; 
       subscribers = subscribers.Add(k, observer); 
       return new AnonymousDisposable(() => 
       { 
        lock (thisLock) 
        { 
         subscribers = subscribers.Remove(k); 
        } 
       }); 
      } 
     } 
    } 

    class AnonymousDisposable : IDisposable 
    { 
     Action dispose; 
     public AnonymousDisposable(Action dispose) 
     { 
      this.dispose = dispose; 
     } 

     public void Dispose() 
     { 
      dispose(); 
     } 
    } 
} 

edición: No tire ObjectDisposedException si Desechar se llama dos veces

+1

Wes Dyer ahora tiene un video en Channel9 hablando de los contratos para estas interfaces. – Benjol

+1

(30 años después ... http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/) – Benjol

+0

Cool - se asegurará de verlo :) –

Respuesta

10

Honestamente, no estoy seguro de qué tan "correcto" es todo esto, pero si se siente bastante bien según mi experiencia hasta ahora. Es código F #, pero con suerte obtienes una idea del sabor. Le permite 'actualizar' un objeto de origen, al que puede llamar Siguiente/Completado/Error, y administra las suscripciones e intenta Assert cuando el origen o los clientes hacen cosas malas.

type ObservableSource<'T>() =  // ' 
    let protect f = 
     let mutable ok = false 
     try 
      f() 
      ok <- true 
     finally 
      Debug.Assert(ok, "IObserver methods must not throw!") 
      // TODO crash? 
    let mutable key = 0 
    // Why a Map and not a Dictionary? Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over 
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>> // ' 
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnNext(x))) 
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnCompleted())) 
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun() -> v.OnError(e))) 
    let thisLock = new obj() 
    let obs = 
     { new IObservable<'T> with  // ' 
      member this.Subscribe(o) = 
       let k = 
        lock thisLock (fun() -> 
         let k = key 
         key <- key + 1 
         subscriptions <- subscriptions.Add(k, o) 
         k) 
       { new IDisposable with 
        member this.Dispose() = 
         lock thisLock (fun() -> 
          subscriptions <- subscriptions.Remove(k)) } } 
    let mutable finished = false 
    // The methods below are not thread-safe; the source ought not call these methods concurrently 
    member this.Next(x) = 
     Debug.Assert(not finished, "IObserver is already finished") 
     next x 
    member this.Completed() = 
     Debug.Assert(not finished, "IObserver is already finished") 
     finished <- true 
     completed() 
    member this.Error(e) = 
     Debug.Assert(not finished, "IObserver is already finished") 
     finished <- true 
     error e 
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads 
    member this.Value = obs 

Me interesarán las ideas sobre lo que está bien o mal aquí; No he tenido la oportunidad de mirar a todas las cosas nuevas Rx de devlabs sin embargo ...

Mis propias experiencias sugieren que:

  • Aquellos que se suscriban a los observables nunca se debe tirar de las suscripciones. No hay nada razonable que un observable pueda hacer cuando lanza un suscriptor. (Esto es similar a los eventos.) Lo más probable es que la excepción simplemente suba a un controlador catch-all de nivel superior o bloquee la aplicación.
  • Las fuentes probablemente deberían ser "lógicamente de un solo hilo". Creo que puede ser más difícil escribir clientes que puedan reaccionar a llamadas concurrentes OnNext; incluso si cada llamada individual proviene de un hilo diferente, es útil evitar llamadas concurrentes.
  • Definitivamente es útil tener una clase base/ayudante que imponga algunos 'contratos'.

estoy muy ansioso por ver si la gente puede mostrar un asesoramiento más concreto a lo largo de estas líneas.

+1

Gracias, tuve una oportunidad de crear algo similar en C# y terminé usando la colección F # Map para evitar el bloqueo durante la enumeración. Otra opción es usar algo como Immutable AVLTree de Eric Lippert. Me he convencido a mí mismo de que es responsabilidad del observador garantizar que los eventos se reciban en el contexto adecuado y que lo observable debería limitarse a generar eventos en el mismo hilo cada vez (a medida que escribe). –

2
  1. grieta abierta Reflector y echar un vistazo.

  2. ver algunos videos C9 - this uno muestra cómo se puede 'deriva' Select 'Combinator'

  3. El secreto es crear clases AnonymousObservable, AnonymousObserver y AnonymousDisposable, (que son sólo el trabajo arounds por el hecho que no puedes crear instancias de interfaces). No contienen implementación, ya que lo transfieres con acciones y funciones.

Por ejemplo:

public class AnonymousObservable<T> : IObservable<T> 
{ 
    private Func<IObserver<T>, IDisposable> _subscribe; 
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe) 
    { 
     _subscribe = subscribe; 
    } 

    public IDisposable Subscribe(IObserver<T> observer) 
    { 
     return _subscribe(observer); 
    } 
} 

dejaré que usted se resuelve el resto ... Es un muy buen ejercicio de comprensión.

Hay un pequeño hilo que crece here con preguntas relacionadas.

+1

Gracias, pero no tan útil. Ya he revisado tanto el reflector como la mayoría de los videos de C9. El reflector solo muestra la implementación real y es muy difícil deducir reglas sobre el enhebrado y demás. También su llamado secreto simplemente impone la responsabilidad de una implementación correcta de la clase real observable a la Func suministrada - no revela las reglas para implementar ese Func. Así que, básicamente, no me dijiste nada excepto para descubrir el resto yo solo :) –

+1

Punto tomado.Para ser honesto, la mayoría de mis esfuerzos hasta ahora han estado tratando de escribir lo que ellos llaman 'combinadores', a diferencia de las fuentes reales. Puede obtener algunas pautas de las respuestas a mi pregunta aquí (el mejor lugar para obtener respuestas 'oficiales' en este momento): http://social.msdn.microsoft.com/Forums/en-US/rx/thread/79402dd3 -009a-46db-9b55-06482e8cad0e – Benjol

2

simplemente una observación con respecto a esta aplicación:

después colecciones concurrentes están introduciendo en FW .net 4 es probable que sea mejor utilizar ConcurrentDictioary en lugar de un simple diccionario.

guarda bloqueos de manejo en la colección.

adi.

6

Sí, la palabra clave de rendimiento es encantadora; tal vez habrá algo similar para IObservable (OfT)? [Editar:. En la PDC '09 talk Eric Meijer, dice "sí, ver este espacio" a un rendimiento declarativo para la generación de los observables]

algo cerca (en vez de rodar su propio), echa un vistazo a the bottom de la wiki "(not yet) 101 Rx Samples", donde el equipo sugiere el uso de la clase Subject (T) como un "backend" para implementar un IObservable (OfT). Aquí está su ejemplo:

public class Order 
{    
    private DateTime? _paidDate; 

    private readonly Subject<Order> _paidSubj = new Subject<Order>(); 
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } } 

    public void MarkPaid(DateTime paidDate) 
    { 
     _paidDate = paidDate;     
     _paidSubj.OnNext(this); // Raise PAID event 
    } 
} 

private static void Main() 
{ 
    var order = new Order(); 
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe 

    order.MarkPaid(DateTime.Now); 
} 
+0

'Asunto' en mi humilde opinión es definitivamente el camino correcto cuando desea generar sus propios observables. –

+2

Solo una nota al margen, AsyncSubject es una mejor opción aquí, ya que mantiene el último valor para los futuros suscriptores. En su ejemplo, es necesario suscribirse antes de que ocurra el evento de pago real. – Nappy

+0

@Nappy: No sabía acerca de 'AsyncSubject ' -gracias por mencionarlo. –

11

El official documentation deprecates usuarios ejecución IObservable sí mismos. En lugar de ello, se espera que los usuarios utilicen el método de fábrica Observable.Create

Cuando sea posible, implementar nuevos operadores mediante la composición de los operadores existentes. De lo contrario, aplicar operadores personalizados utilizando Observable.Create

Sucede que Observable.Create es una envoltura alrededor de la clase trivial interna del reactivo AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) 
{ 
    if (subscribe == null) 
    { 
     throw new ArgumentNullException("subscribe"); 
    } 
    return new AnonymousObservable<TSource>(subscribe); 
} 

No sé por qué no hacen su aplicación público, pero bueno, lo que sea.

+0

Correcto. No implemente 'IObservable ' o 'IObserver ' usted mismo. –

+0

Hola Lee. Me encanta su libro, anteriormente blog sobre RX, una guía mucho mejor que los documentos oficiales. –

+1

Saludos. Como Rx ahora es de código abierto, espero poder ayudar al equipo a actualizar el código oficial/documentos. –

Cuestiones relacionadas