Las extensiones reactivas vienen con una gran cantidad de métodos auxiliares para convertir eventos existentes y operaciones asincrónicas en observables, pero ¿cómo implementaría un IObservable <T> desde cero?Implementando IObservable <T> desde cero
IEnumerable tiene la encantadora palabra clave yield para que sea muy simple de implementar.
¿Cuál es la forma correcta de implementar IObservable <T>?
¿Debo preocuparme por la seguridad de los hilos?
Sé que hay soporte para recibir una llamada de vuelta en un contexto de sincronización específico, pero ¿es esto algo que yo como autor IObservable <T> debo preocuparme o esto de alguna manera está incorporado?
actualización:
Aquí está mi C# versión de la solución de Brian F #
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
edición: No tire ObjectDisposedException si Desechar se llama dos veces
Wes Dyer ahora tiene un video en Channel9 hablando de los contratos para estas interfaces. – Benjol
(30 años después ... http://channel9.msdn.com/posts/J.Van.Gogh/Reactive-Extensions-API-in-depth-Contract/) – Benjol
Cool - se asegurará de verlo :) –