5 votes

Opérateur Rx personnalisé pour l'étranglement seulement quand il y a eu une valeur récente

J'essaie de créer un opérateur Rx qui semble assez utile, mais je n'ai étonnamment pas trouvé de questions sur Stackoverflow qui correspondent précisément. J'aimerais créer une variante de Throttle qui laisse passer les valeurs immédiatement s'il y a eu une période d'inactivité. Le cas d'utilisation que j'imagine est quelque chose comme ceci :

J'ai une liste déroulante qui déclenche une requête web lorsque la valeur est modifiée. Si l'utilisateur maintient la touche fléchée enfoncée et fait défiler rapidement les valeurs, je ne veux pas déclencher une requête pour chaque valeur. Mais si j'étrangle le flux, l'utilisateur doit attendre la durée de l'étranglement chaque fois qu'il sélectionne une valeur dans la liste déroulante de manière normale.

Ainsi, alors qu'un Throttle ressemble à ça : Normal Throttle():

Je veux créer ThrottleSubsequent qui ressemblent à ça : ThrottleSubsequent():

Notez que les billes 1, 2 et 6 sont passées sans délai car elles suivent chacune une période d'inactivité.

Ma tentative ressemble à ce qui suit :

public static IObservable<TSource> ThrottleSubsequent<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
    // Create a timer that resets with each new source value
    var cooldownTimer = source
        .Select(x => Observable.Interval(dueTime, scheduler)) // Each source value becomes a new timer
        .Switch(); // Switch to the most recent timer

    var cooldownWindow = source.Window(() => cooldownTimer);

    // Pass along the first value of each cooldown window immediately
    var firstAfterCooldown = cooldownWindow.SelectMany(o => o.Take(1));

    // Throttle the rest of the values 
    var throttledRest = cooldownWindow
        .SelectMany(o => o.Skip(1))
        .Throttle(dueTime, scheduler);

    return Observable.Merge(firstAfterCooldown, throttledRest);
}

Este semble mais j'ai du mal à raisonner et j'ai l'impression qu'il y a des cas limites où les choses peuvent se gâter avec des valeurs en double ou autre. J'aimerais avoir l'avis d'utilisateurs de Rx plus expérimentés pour savoir si ce code est correct ou non, et/ou s'il existe une façon plus idiomatique de le faire.

2voto

Shlomo Points 1913

Voici une suite de tests (utilisant nuget Microsoft.Reactive.Testing ) :

var ts = new TestScheduler();
var source = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(300.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(510.MsTicks(), Notification.CreateOnNext('D')),
    new Recorded<Notification<char>>(550.MsTicks(), Notification.CreateOnNext('E')),
    new Recorded<Notification<char>>(610.MsTicks(), Notification.CreateOnNext('F')),
    new Recorded<Notification<char>>(760.MsTicks(), Notification.CreateOnNext('G'))
);

var target = source.ThrottleSubsequent(TimeSpan.FromMilliseconds(150), ts);
var expectedResults = ts.CreateHotObservable<char>(
    new Recorded<Notification<char>>(200.MsTicks(), Notification.CreateOnNext('A')),
    new Recorded<Notification<char>>(450.MsTicks(), Notification.CreateOnNext('B')),
    new Recorded<Notification<char>>(500.MsTicks(), Notification.CreateOnNext('C')),
    new Recorded<Notification<char>>(910.MsTicks(), Notification.CreateOnNext('G'))
);

var observer = ts.CreateObserver<char>();
target.Subscribe(observer);
ts.Start();

ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);

et en utilisant

public static class TestingHelpers
{
    public static long MsTicks(this int i)
    {
        return TimeSpan.FromMilliseconds(i).Ticks;
    }
}

Semble passer. Si vous vouliez le réduire, vous pourriez le transformer en ceci :

public static IObservable<TSource> ThrottleSubsequent2<TSource>(this IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
{
    return source.Publish(_source => _source
        .Window(() => _source
            .Select(x => Observable.Interval(dueTime, scheduler))
            .Switch()
        ))
        .Publish(cooldownWindow =>
            Observable.Merge(
                cooldownWindow
                    .SelectMany(o => o.Take(1)),
                cooldownWindow
                    .SelectMany(o => o.Skip(1))
                    .Throttle(dueTime, scheduler)
            )
        );
}

EDIT :

Publish force le partage d'un abonnement. Si vous avez un mauvais (ou coûteux) observable source avec des effets secondaires d'abonnement, Publish permet de s'assurer que l'on ne s'abonne qu'une seule fois. Voici un exemple où Publish aide :

void Main()
{
    var source = UglyRange(10);
    var target = source
        .SelectMany(i => Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10 * i)))
        .ThrottleSubsequent2(TimeSpan.FromMilliseconds(70), Scheduler.Default) //Works with ThrottleSubsequent2, fails with ThrottleSubsequent
        .Subscribe(i => Console.WriteLine(i));
}
static int counter = 0;
public IObservable<int> UglyRange(int limit)
{
    var uglySource = Observable.Create<int>(o =>
    {
        if (counter++ == 0)
        {
            Console.WriteLine("Ugly observable should only be created once.");
            Enumerable.Range(1, limit).ToList().ForEach(i => o.OnNext(i));
        }
        else
        {
            Console.WriteLine($"Ugly observable should only be created once. This is the {counter}th time created.");
            o.OnError(new Exception($"observable invoked {counter} times."));
        }
        return Disposable.Empty;
    });
    return uglySource;
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X