40 votes

Attendre que les threads du pool se terminent

Je suis désolé pour cette question redondante. Cependant, j'ai trouvé de nombreuses solutions à mon problème mais aucune n'est très bien expliquée. J'espère que ce sera clair, ici.

Le thread principal de mon application C# génère 1..n travailleurs en arrière-plan en utilisant le ThreadPool. Je souhaite que le thread d'origine se verrouille jusqu'à ce que tous les travailleurs aient terminé. J'ai fait des recherches sur le ManualResetEvent en particulier, mais je ne comprends pas bien son utilité.

En pseudo :

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

Si nécessaire, je connaîtrai à l'avance le nombre de travailleurs qui vont être mis en file d'attente.

0 votes

Bonjour, consultez un article similaire ici stackoverflow.com/questions/358721/

56voto

JaredPar Points 333733

Essayez ça. La fonction prend une liste de délégués d'action. Elle ajoute une entrée de travailleur ThreadPool pour chaque élément de la liste. Elle attendra que chaque action soit terminée avant de revenir.

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}

7 votes

WaitHandle.WaitAll échoue si le nombre de handles est supérieur à ce que le système permet. Sur mon serveur Win2k3, ce nombre est de 64 et je reçois donc une exception lorsque j'essaie de créer plus de 64 éléments...

1 votes

@Eran, essayez d'écrire un SpawAndWaitHelper qui a essentiellement le code ci-dessus. Utilisez SpawAndWait pour diviser l'énumérable en morceaux de taille 64 et appelez l'aide pour chaque morceau.

0 votes

31voto

Marc Gravell Points 482669

Voici une approche différente - l'encapsulation ; ainsi votre code pourrait être aussi simple que :

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Où le Forker est donnée ci-dessous (je me suis ennuyé dans le train ;-p)... encore une fois, cela évite les objets OS, mais résume les choses de manière assez nette (IMO) :

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}

0 votes

(HI MARC ! Vous vous souvenez de ce post ??) Par curiosité, pourquoi var tmp = obj est-il nécessaire ? Je l'ai implémenté en passant simplement mon objet et j'ai obtenu des résultats bizarres. Le fait d'utiliser var a fini par fonctionner. Il y a manifestement quelque chose que je ne comprends pas ! Merci, et voyez si vous pouvez vous souvenir après seulement deux ans :)

1 votes

La réponse à cette question est un peu compliquée, mais en résumé, c'est parce que C# ne parvient pas à faire exactement ce que vous vouliez sans que vous vous en rendiez compte. Il est généralement assez bon pour faire cela sans ambiguïté à tous les bons endroits, mais pas dans ce cas.

5 votes

Vous devez comprendre que le code delegate { DoSomeWork(tmp); } captures la variable tmp . Chaque appel à ce code capture un différents à chaque fois que l'on fait le tour de la boucle, car tmp est limitée au corps de la boucle. Cependant, le foreach est la variable même à chaque tour de boucle, de sorte que tous les appels à la variable delegate { DoSomeWork(tmp); } capturent la même chose. Cela n'a pas besoin d'être comme ça ; limiter la portée de la variable foreach aurait permis à beaucoup de code de "fonctionner" sans que les gens ne se rendent compte de l'astuce de la situation.

14voto

Marc Gravell Points 482669

Tout d'abord, combien de temps les travailleurs doivent-ils s'exécuter ? Les threads de pool doivent généralement être utilisés pour des tâches de courte durée - s'ils doivent s'exécuter pendant un certain temps, envisagez des threads manuels.

En ce qui concerne le problème, avez-vous réellement besoin de bloquer le thread principal ? Pouvez-vous utiliser un callback à la place ? Si oui, quelque chose comme :

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

Il s'agit d'un moyen assez léger (pas de primitives OS) de suivre les travailleurs.

Si vous necesito pour bloquer, vous pouvez faire la même chose en utilisant un fichier Monitor (encore une fois, en évitant un objet OS) :

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");

2 votes

Votre code attendra infiniment si l'un des délégués lève une exception.

2 votes

Si l'un de ces délégués lève une exception, je vais perdre tout le processus, donc c'est assez arbitraire... Je suppose que ça ne sera pas le cas, mais je vais le rendre explicite ;-p

0 votes

Les travailleurs traiteront des opérations coûteuses, notamment la lecture et l'écriture de fichiers et l'exécution de sélections et d'insertions SQL impliquant des colonnes binaires/image. Il est peu probable qu'ils vivent assez longtemps pour nécessiter des threads explicites, mais on peut gagner en performance en les laissant s'exécuter en parallèle.

9voto

J'ai utilisé la nouvelle bibliothèque de tâches parallèles dans le CTP. aquí :

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });

3voto

Brian Gideon Points 26683

Voici une solution utilisant le CountdownEvent classe.

var complete = new CountdownEvent(1);
foreach (var o in collection)
{
  var capture = o;
  ThreadPool.QueueUserWorkItem((state) =>
    {
      try
      {
        DoSomething(capture);
      }
      finally
      {
        complete.Signal();
      }
    }, null);
}
complete.Signal();
complete.Wait();

Bien sûr, si vous avez accès à la CountdownEvent vous disposez alors de la totalité de la TPL pour travailler. Le site Parallel La classe s'occupe de l'attente pour vous.

Parallel.ForEach(collection, o =>
  {
    DoSomething(o);
  });

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X