53 votes

Implémentation correcte d'un bloc réitérable

Teaser: les gars, cette question n'est pas sur la façon de mettre en œuvre réessayer politique. C'est à propos de l'exécution correcte d'un TPL Dataflow bloc.

Cette question est essentiellement une continuation de ma question précédente Réessayer politique au sein de ITargetBlock. La réponse à cette question a été @svick la solution intelligente qui utilise TransformBlock (source) et TransformManyBlock (cible). Le seul problème reste plus qu'à remplir ce bloc dans un droit chemin: attendez que toutes les tentatives pour faire en premier, puis compléter le bloc cible. Voici ce que j'ai (c'est juste un extrait, ne payez pas trop d'attention à un non-thread-safe retries ):

var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    while (target.InputCount > 0 || retries.Any())
        await Task.Delay(100);

    target.Complete();
});

L'idée est d'effectuer un certain type d'interrogation et de vérifier si il y a encore des messages en attente de traitement et il n'y a pas de messages qui nécessitent une nouvelle tentative. Mais dans cette solution, je n'aime pas l'idée de bureaux de vote.

Oui, je peux encapsuler la logique de l'ajout/suppression de tentatives dans une catégorie distincte, et même par exemple, effectuer une action lorsque l'ensemble des tentatives devient vide, mais la façon de traiter avec target.InputCount > 0 état? Il n'est pas un rappel que quand il n'existe pas de messages en attente pour le bloc, il semble que la vérification de target.ItemCount dans une boucle avec un petit retard est une seule option.

Quelqu'un sait d'une façon plus intelligente pour y parvenir?

2voto

hwcverwe Points 1746

Peut-être qu'un ManualResetEvent peut faire l'affaire pour vous.

Ajouter une propriété publique à TransformManyBlock

 private ManualResetEvent _signal  = new ManualResetEvent(false);
public ManualResetEvent Signal { get { return _signal; } }
 

Et voilà:

 var retries = new HashSet<RetryingMessage<TInput>>();

TransformManyBlock<RetryableMessage<TInput>, TOutput> target = null;
target = new TransformManyBlock<RetryableMessage<TInput>, TOutput>(
    async message =>
    {
        try
        {
            var result = new[] { await transform(message.Data) };
            retries.Remove(message);

            // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
            if(!retries.Any()) Signal.Set(); 
            return result;
        }
        catch (Exception ex)
        {
            message.Exceptions.Add(ex);
            if (message.RetriesRemaining == 0)
            {
                if (failureHandler != null)
                    failureHandler(message.Exceptions);

                retries.Remove(message);

                // Sets the state of the event to signaled, allowing one or more waiting threads to proceed
                if(!retries.Any()) Signal.Set(); 
            }
            else
            {
                retries.Add(message);
                message.RetriesRemaining--;

                Task.Delay(retryDelay)
                    .ContinueWith(_ => target.Post(message));
            }
            return null;
        }
    }, dataflowBlockOptions);

source.LinkTo(target);

source.Completion.ContinueWith(async _ =>
{
    //Blocks the current thread until the current WaitHandle receives a signal.
    target.Signal.WaitOne();

    target.Complete();
});
 

Je ne suis pas sûr de savoir où votre target.InputCount est défini. Donc, à l'endroit où vous modifiez target.InputCount vous pouvez ajouter le code suivant:

 if(InputCount == 0)  Signal.Set();
 

1voto

Lorenzo Dematté Points 3082

Combinant hwcverwe réponse et JamieSee commentaire pourrait être la solution idéale.

Tout d'abord, vous devez créer plus d'un événement:

var signal  = new ManualResetEvent(false);
var completedEvent = new ManualResetEvent(false);

Ensuite, vous devez créer un observateur, et vous abonner à la TransformManyBlock, ainsi vous êtes averti lorsqu'un événement se produit:

var observer = new RetryingBlockObserver<TOutput>(completedEvent);
var observable = target.AsObservable();
observable.Subscribe(observer);

Les observables peut être assez facile:

private class RetryingBlockObserver<T> : IObserver<T> {
        private ManualResetEvent completedEvent;

        public RetryingBlockObserver(ManualResetEvent completedEvent) {                
            this.completedEvent = completedEvent;
        }

        public void OnCompleted() {
            completedEvent.Set();
        }

        public void OnError(Exception error) {
            //TODO
        }

        public void OnNext(T value) {
            //TODO
        }
    }

Et vous pouvez attendre le signal, ou de l'achèvement (épuisement de tous les éléments de source), ou les deux

 source.Completion.ContinueWith(async _ => {

            WaitHandle.WaitAll(completedEvent, signal);
            // Or WaitHandle.WaitAny, depending on your needs!

            target.Complete();
        });

Vous pouvez consulter le résultat de la valeur de WaitAll pour comprendre l'événement qui a été fixé, et de réagir en conséquence. Vous pouvez également ajouter d'autres événements pour le code, en les passant à l'observateur, de sorte qu'il peut en cas de besoin. Vous pouvez différencier votre comportement et réagissent différemment lorsqu'une erreur est générée, par exemple

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X