2 votes

Observable.Delay appelant Dispose avant le déclenchement de OnNext

J'ai du mal à comprendre comment fonctionne Observable.Delay et quand Dispose() doit être appelé. Est-ce que quelqu'un de familier avec Rx pourrait m'aider s'il vous plaît ?

L'extrait de code suivant :

    static void Main(string[] args)
    {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));

        Console.ReadKey();
    }

    public class SomeObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> o)
        {
            for (var i = 0; i < 2; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();

            return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
    }

    public class DisposableAction : IDisposable
    {
        public DisposableAction(Action dispose)
        {
            this.dispose = dispose;
        }

        readonly Action dispose;

        public void Dispose()
        {
            dispose();
        }
    }

donne le résultat suivant :

0
1
ÉLIMINÉ
ÉLIMINÉ
ÉLIMINÉ
...0...
...1...
......0......
......1......

Je m'attendais à ce que ce soit plutôt comme.. :

0
1
ÉLIMINÉ
...0...
...1...
ÉLIMINÉ
......0......
......1......
ÉLIMINÉS

Une idée ?

2voto

Richard Szalay Points 42486

La fonctionnalité standard de Rx consiste à se débarrasser d'un abonnement lorsque la séquence est terminée (même si ses valeurs sont encore acheminées par une autre séquence).

C'est dans cette optique qu'il a été créé, Delay ne peut pas contrôler la vitesse des valeurs émises par la séquence source, il ne peut que retarder les valeurs pour ses propres observateurs.

0voto

PL. Points 2041

Si je devais deviner, je dirais que Delay met en file d'attente les éléments provenant de l'observable d'origine et les distribue ensuite comme il l'entend en fonction du délai spécifié. Ainsi, même si l'observable original a été éliminé depuis longtemps, l'observable créé par la méthode Delay est toujours en vie. Le comportement que vous observez correspond bien à cette explication.

-1voto

Zubin Appoo Points 41

Sans le ThreadPool, le comportement est identique :

0 1 ÉLIMINÉ ÉLIMINÉ ...0... ...1... ......0...... ......1......

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X