4 votes

Rx : Attendre le premier élément pendant un certain temps

J'aimerais transformer mon ancienne méthode basée sur les événements en méthode basée sur les observables, mais je suis assez novice en matière de Rx, et je suis donc coincé.

J'ai une source d'événement, qui est maintenant un observable. À un moment donné, je dois lancer une méthode qui se termine soit par le retour de l'élément suivant de la ligne, soit par un résultat nul si elle est expirée.

L'approche basée sur les événements se présente comme suit :

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    ReaderEvent result = null;
    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
    {
        cts.CancelAfter(waitFor);

        EventHandler<ReaderEvent> localHandler = (o, e) =>
        {
            if (e.PlaceId == PlaceId)
            {
                result = e;
                cts.Cancel();
            }
        };

        ReaderEventHandler += localHandler;
        try
        {
            await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
        }
        catch (OperationCanceledException) { }
        catch (Exception ex)
        {
            //...
        }

        ReaderEventHandler -= localHandler;
    }

    return result;
}

Comme vous pouvez le voir, l'idée est que le délai est annulé soit par l'arrivée de l'événement que j'attends, soit que la source de jetons est annulée par configuration après ce délai spécifique. Assez propre.

Maintenant, la version Rx :

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    ReaderEvent result = null;

    var observable = _OnReaderEvent.FirstAsync(r => r.PlaceId == PlaceId);

    using (var cts = CancellationTokenSource.CreateLinkedTokenSource(new [] { topLevelToken }))
    {
        cts.CancelAfter(waitFor);
        using (observable.Subscribe(x => {
            result = x;
            cts.Cancel();
        {
            try
            {
                await Task.Delay(waitFor, cts.Token).ConfigureAwait(false);
            }
            catch (OperationCanceledException) { }
        }
    }
    return result;
}

Pas si propre... même pire... J'ai aussi essayé avec l'extension Timeout. Mais comme il s'agit d'un abonnement unique, j'ai toujours besoin d'attendre quelque part avant de disposer de l'abonnement. La seule différence serait que le OnError annulerait le jeton local, et non le mécanisme intégré de CancelAfter.

Existe-t-il un moyen plus efficace / plus concis (qui s'appuie davantage sur le Rx) de faire cela ?

Merci !

3voto

Vous pourriez essayer avec :

var values = await _OnReaderEvent
  .Where(r => r.PlaceId == placeId)
  .Buffer(waitFor, 1)
  .FirstAsync(); // get list of matching elements during waitFor time

return values.FirstOrDefault(); // return first element or null if the list is empty

1voto

Enigmativity Points 26345

Pourquoi ne pas simplement opter pour une version Rx simple du code :

public async Task<ReaderEvent> WaitForReaderAsync(int PlaceId, TimeSpan waitFor)
{
    return await
        _OnReaderEvent
            .Where(r => r.PlaceId == PlaceId)
            .Buffer(waitFor, 1)
            .Select(xs => xs.FirstOrDefault())
            .FirstOrDefaultAsync()
            .ToTask();
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X