Le Réactif Extensions de venir avec un grand nombre de méthodes d'assistance pour le tournage des événements existants et les opérations asynchrones dans les phénomènes observables, mais comment voulez-vous mettre en œuvre un IObservable<T> à partir de zéro?
IEnumerable a la belle de rendement mot-clé, il est très simple à mettre en œuvre.
Quelle est la bonne façon de mettre en œuvre IObservable<T>?
Ai-je besoin de vous soucier de la sécurité des threads?
Je sais, il ya un soutien pour les avoir à intervenir sur un contexte de synchronisation mais est-ce quelque chose que j'ai comme un IObservable<T> auteur besoin de s'inquiéter ou de ce en quelque sorte intégré?
mise à jour:
Voici ma version de C# de Brian F# solution
using System;
using System.Linq;
using Microsoft.FSharp.Collections;
namespace Jesperll
{
class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
{
private FSharpMap<int, IObserver<T>> subscribers =
FSharpMap<int, IObserver<T>>.Empty;
private readonly object thisLock = new object();
private int key;
private bool isDisposed;
public void Dispose()
{
Dispose(true);
}
protected virtual void Dispose(bool disposing)
{
if (disposing && !isDisposed)
{
OnCompleted();
isDisposed = true;
}
}
protected void OnNext(T value)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnNext(value);
}
}
protected void OnError(Exception exception)
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
if (exception == null)
{
throw new ArgumentNullException("exception");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnError(exception);
}
}
protected void OnCompleted()
{
if (isDisposed)
{
throw new ObjectDisposedException("Observable<T>");
}
foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
{
observer.OnCompleted();
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
lock (thisLock)
{
int k = key++;
subscribers = subscribers.Add(k, observer);
return new AnonymousDisposable(() =>
{
lock (thisLock)
{
subscribers = subscribers.Remove(k);
}
});
}
}
}
class AnonymousDisposable : IDisposable
{
Action dispose;
public AnonymousDisposable(Action dispose)
{
this.dispose = dispose;
}
public void Dispose()
{
dispose();
}
}
}
edit: Ne pas jeter de ObjectDisposedException si dispose est appelé deux fois