38 votes

Implémentation d'IObservable <T> à partir de rien

Le Réactif Extensions de venir avec un grand nombre de méthodes d'assistance pour le tournage des événements existants et les opérations asynchrones dans les phénomènes observables, mais comment voulez-vous mettre en œuvre un IObservable<T> à partir de zéro?

IEnumerable a la belle de rendement mot-clé, il est très simple à mettre en œuvre.

Quelle est la bonne façon de mettre en œuvre IObservable<T>?

Ai-je besoin de vous soucier de la sécurité des threads?

Je sais, il ya un soutien pour les avoir à intervenir sur un contexte de synchronisation mais est-ce quelque chose que j'ai comme un IObservable<T> auteur besoin de s'inquiéter ou de ce en quelque sorte intégré?

mise à jour:

Voici ma version de C# de Brian F# solution

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

edit: Ne pas jeter de ObjectDisposedException si dispose est appelé deux fois

11voto

Colonel Panic Points 18390

La documentation officielle dénonçait les utilisateurs de la mise en œuvre de IObservable eux-mêmes. Au lieu de cela, les utilisateurs doivent utiliser la méthode de factorisation Observable.Create

Lorsque cela est possible, de mettre en œuvre de nouveaux opérateurs, en composant des opérateurs existants. Sinon implémenter des opérateurs utilisant des Observables.Créer

Il arrive que les Observables.Créer est un trivial wrapper autour de Réactif est une classe interne AnonymousObservable:

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

Je ne sais pas pourquoi ils ne font pas leur mise en œuvre public, mais bon, peu importe.

10voto

Brian Points 82719

Honnêtement, je ne suis pas sûr de la façon de "droit" tout cela est, mais s'il se sent assez bon, d'après mon expérience jusqu'à présent. C'est le code F#, mais j'espère que vous obtenez une idée de la saveur. Il vous permet de "nouveau" un objet source, que vous pouvez ensuite appeler à Côté/Terminé/Erreur, et il gère les abonnements et tente de faire Valoir lorsque la source ou les clients de faire de mauvaises choses.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Je serai intéressé par toutes les pensées à propos de ce qui est bon ou mauvais ici; je n'ai pas eu la chance de voir toutes les nouvelles Rx trucs de devlabs encore...

Mes propres expériences suggèrent que:

  • Ceux qui s'abonnent à des observables ne devrait jamais jeter de l'abonnement. Il n'y a rien de raisonnable observable peut faire quand un abonné de lancers. (Ceci est similaire à des événements.) Probablement l'exception seulement de la bulle jusqu'à un niveau supérieur fourre-tout gestionnaire ou le plantage de l'application.
  • Sources devrait probablement être "logiquement mono-thread". Je pense qu'il peut être plus difficile à écrire des clients qui peuvent réagir à la concomitance des OnNext appels; même si chaque appel provient d'un autre thread, il est utile d'éviter des appels simultanés.
  • Il est certainement utile d'avoir une base/classe helper qui impose certains "contrats".

Je suis très curieux de savoir si les gens peuvent se montrer plus des conseils concrets, le long de ces lignes.

7voto

David Cuccia Points 1036

Oui, le mot clé yield est jolie; il y aura peut-être quelque chose de similaire pour IObservable(OfT)? [edit: Eric Meijer du PDC '09 de parler , il dit: "oui, regardez cet espace" à déclaratif de rendement pour générer des phénomènes observables.]

Pour quelque chose de proche (au lieu de roulement de votre propre), vérifier le fond de l' "(pas encore) 101 Rx Échantillons" wiki, où l'équipe suggère l'utilisation de l'Objet(T) de classe comme un "backend" pour mettre en œuvre une IObservable(OfT). Voici l'exemple:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}

2voto

Benjol Points 16334
  1. Crack ouvrir un Réflecteur et un coup d'oeil.

  2. Regarder C9 vidéos - cette montre comment vous pouvez "tirer" l'Sélectionnez "combinator'

  3. Le secret est de créer AnonymousObservable, AnonymousObserver et AnonymousDisposable classes (qui sont juste des solutions pour le fait que vous ne pouvez pas instancier des interfaces). Ils contiennent zéro de la mise en œuvre, et comme vous, que dans les Actions et les Funcs.

Par exemple:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

Je vais vous laisser travailler le reste... c'est un très bon exercice de compréhension.

Il y a un joli petit fil de croissance ici avec des questions connexes.

2voto

Adiel Yaacov Points 491

juste une remarque concernant cette implémentation:

Après l’introduction de collections simultanées dans .net fw 4, il est probablement préférable d’utiliser ConcurrentDictioary au lieu d’un simple dictionnaire.

il enregistre les verrous de manipulation sur la collection.

adi.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X