80 votes

Comment puis-je empêcher les continuations synchrones sur une tâche?

J'ai une bibliothèque (prise réseau) code qui fournit une Task-en fonction de l'API pour l'attente des réponses à des demandes en fonction TaskCompletionSource<T>. Cependant, il y a une gêne dans le TPL, il semble impossible d'empêcher synchrone continuations. Ce que je voudrais , comme pour être en mesure de faire est soit:

  • dire à un TaskCompletionSource<T> qui ne devrait pas permettre aux appelants de se joindre avec des TaskContinuationOptions.ExecuteSynchronously, ou
  • définir le résultat (SetResult / TrySetResult) d'une manière qui indique que TaskContinuationOptions.ExecuteSynchronously devraient être ignorées, et l'utilisation de la piscine à la place

Plus précisément, le problème que j'ai est que les données entrantes sont traitées par un lecteur dédié, et si un appelant peut fixer avec TaskContinuationOptions.ExecuteSynchronously ils peuvent enrayer le lecteur (qui ne touche pas uniquement eux). Auparavant, j'ai travaillé autour de cette quelques hackery qui détecte si toutes les suites sont présents, et si elles le sont, il pousse l'achèvement sur l' ThreadPool, cependant cela a un impact significatif si l'appelant a saturé leur file d'attente de travail, que la fin ne sera pas obtenir traitées en temps opportun. Si elles sont à l'aide de Task.Wait() (ou similaire), ils seront alors essentiellement de blocage eux-mêmes. De même, c'est pourquoi le lecteur est sur un thread dédié plutôt que d'utiliser des travailleurs.

Donc, avant j'ai essayer et de nag le TPL de l'équipe: j'ai loupé une option?

Points clés:

  • Je ne veux pas que les appelants externes pour être en mesure de pirater mon thread
  • Je ne peux pas utiliser l' ThreadPool comme une mise en œuvre, comme il doit fonctionner lorsque la piscine est saturé

L'exemple ci-dessous, produit un résultat (la commande peut varier en fonction du moment):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Le problème, c'est le fait qu'un hasard de l'appelant a réussi à obtenir une continuation de "fil conducteur". Dans le code réel, ce serait d'interrompre la primaire reader; de mauvaises choses!

Code:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

48voto

Eli Arbel Points 5266

Nouvelle dans .NET 4.6:

Le dernier aperçu de .NET de 4,6 contient un nouveau TaskCreationOptions: RunContinuationsAsynchronously.


Depuis que vous êtes prêt à utiliser la Réflexion pour accéder aux champs privés...

Vous pouvez marquer le SDC de la Tâche avec l' TASK_STATE_THREAD_WAS_ABORTED drapeau, qui serait la cause de tous les continuations de ne pas être insérée.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Edit:

Au lieu d'utiliser la Réflexion émettent, je vous suggère d'utiliser des expressions. C'est beaucoup plus lisible et a l'avantage d'être PCL-compatible:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Sans l'aide de la Réflexion:

Si ça intéresse quelqu'un, j'ai trouvé un moyen de le faire sans Réflexion, mais c'est un peu "sale" ainsi, et des cours comporte une partie non négligeable de la perf de pénalité:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

9voto

Noseratio Points 23840

Je ne pense pas qu'il y ait quelque chose dans le TPL qui apporterait explicite de l'API de contrôle TaskCompletionSource.SetResult des continuations. J'ai décidé de garder mon début de réponse pour le contrôle de ce comportement pour async/await scénarios.

Voici une autre solution qui impose asynchrone sur ContinueWith, si l' tcs.SetResult-déclenché la poursuite a lieu sur le même thread que l' SetResult a été appelée sur:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Mis à jour pour tenir compte de ce commentaire:

Je n'ai pas de contrôle de l'appelant - je ne peux pas les obtenir à utiliser un spécifique continuer-avec la variante: si je pouvais, le problème n'existerait pas dans le la première place

Je n'étais pas au courant que vous n'avez pas de contrôle de l'appelant. Néanmoins, si vous ne contrôlez pas, vous n'êtes probablement pas passer l' TaskCompletionSource objet directement à l'appelant, soit. Logiquement, vous seriez de passage du jeton de partie, c'est à dire tcs.Task. Dans ce cas, la solution pourrait être encore plus facile, en ajoutant une autre méthode d'extension vers le haut:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Utilisation:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Cela fonctionne pour les deux await et ContinueWith (violon) et est libre de réflexion hacks.

4voto

Ivan Zlatanov Points 2317

Qu'en est-il au lieu de faire

 var task = source.Task;
 

vous faites cela à la place

 var task = source.Task.ContinueWith<Int32>( x => x.Result );
 

Ainsi, vous ajoutez toujours une continuation qui sera exécutée de manière asynchrone, peu importe si les abonnés souhaitent une continuation dans le même contexte. C'est en quelque sorte la tâche à accomplir, n'est-ce pas?

3voto

Noseratio Points 23840

Mise à jour, j'ai posté une réponse distincte pour traiter ContinueWith plutôt await (parce qu' ContinueWith ne se soucie pas de l'actuel contexte de synchronisation).

Vous pouvez utiliser un muet contexte de synchronisation pour imposer l'asynchronie lors de la poursuite déclenchée par l'appelant SetResult/SetCancelled/SetException sur TaskCompletionSource. Je crois que l'actuel contexte de synchronisation (en point de await tcs.Task) est le critère TPL utilise pour décider de l'opportunité de la poursuite synchrone ou asynchrone.

Le suivant fonctionne pour moi:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync est mis en œuvre comme ceci:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext est très très cher en termes de frais généraux, il ajoute. En fait, une approche similaire est prise par la mise en œuvre de WPF Dispatcher.BeginInvoke.

TPL compare la cible contexte de synchronisation au point de await à celle du point de tcs.SetResult. Si le contexte de synchronisation est le même (ou il n'y a pas de contexte de synchronisation dans les deux endroits), la suite est appelée directement, de façon synchrone. Sinon, c'est en file d'attente à l'aide de SynchronizationContext.Post sur la cible contexte de synchronisation, c'est à dire, la normale await comportement. Ce que cette approche n'est toujours imposer l' SynchronizationContext.Post comportement (ou un pool de thread suite si il n'y a pas de cible contexte de synchronisation).

Mise à jour, cela ne fonctionne pas pour task.ContinueWithcar ContinueWith ne se soucie pas de l'actuel contexte de synchronisation. Il fonctionne cependant pour await task (violon). Il fait également travailler pour await task.ConfigureAwait(false).

Otoh, que, cette approche fonctionne pour ContinueWith.

3voto

Fredou Points 9553

si vous pouvez et êtes prêt à utiliser la réflexion, cela devrait le faire;

 public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}
 

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X