62 votes

Code pour un pool de threads simple en C#

Je recherche un exemple de code (C#) pour la mise en œuvre d'un pool de threads simple.

J'en ai trouvé un sur codeproject, mais la base de code était énorme et je n'ai pas besoin de toutes ces fonctionnalités.

C'est plus à des fins éducatives de toute façon.

5 votes

La réponse courte est que vous ne devriez pas écrire votre propre version, sauf s'il s'agit d'un exercice d'apprentissage. Si c'est un exercice d'apprentissage, vous apprendrez davantage en l'écrivant vous-même qu'en copiant le code de quelqu'un d'autre :)

2 votes

@Greg : N'y a-t-il pas de circonstances où vous pourriez vouloir un pool de threads indépendant du ThreadPool standard existant ?

1 votes

@Anthony : En lisant ce qui a été fait pour le threadpool intégré dans les différents posts de Joe Duffy (et d'autres), je suis raisonnablement confiant que n'importe quel threadpool que je mettrais ensemble serait beaucoup plus faible que celui qui existe déjà.

53voto

Milan Gardian Points 6596

Il s'agit de l'implémentation la plus simple et naïve du pool de threads que j'ai pu réaliser à des fins éducatives (C# / .NET 3.5). Elle n'utilise en aucun cas l'implémentation du pool de threads de .NET.

using System;
using System.Collections.Generic;
using System.Threading;

namespace SimpleThreadPool
{
    public sealed class Pool : IDisposable
    {
        public Pool(int size)
        {
            this._workers = new LinkedList<Thread>();
            for (var i = 0; i < size; ++i)
            {
                var worker = new Thread(this.Worker) { Name = string.Concat("Worker ", i) };
                worker.Start();
                this._workers.AddLast(worker);
            }
        }

        public void Dispose()
        {
            var waitForThreads = false;
            lock (this._tasks)
            {
                if (!this._disposed)
                {
                    GC.SuppressFinalize(this);

                    this._disallowAdd = true; // wait for all tasks to finish processing while not allowing any more new tasks
                    while (this._tasks.Count > 0)
                    {
                        Monitor.Wait(this._tasks);
                    }

                    this._disposed = true;
                    Monitor.PulseAll(this._tasks); // wake all workers (none of them will be active at this point; disposed flag will cause then to finish so that we can join them)
                    waitForThreads = true;
                }
            }
            if (waitForThreads)
            {
                foreach (var worker in this._workers)
                {
                    worker.Join();
                }
            }
        }

        public void QueueTask(Action task)
        {
            lock (this._tasks)
            {
                if (this._disallowAdd) { throw new InvalidOperationException("This Pool instance is in the process of being disposed, can't add anymore"); }
                if (this._disposed) { throw new ObjectDisposedException("This Pool instance has already been disposed"); }
                this._tasks.AddLast(task);
                Monitor.PulseAll(this._tasks); // pulse because tasks count changed
            }
        }

        private void Worker()
        {
            Action task = null;
            while (true) // loop until threadpool is disposed
            {
                lock (this._tasks) // finding a task needs to be atomic
                {
                    while (true) // wait for our turn in _workers queue and an available task
                    {
                        if (this._disposed)
                        {
                            return;
                        }
                        if (null != this._workers.First && object.ReferenceEquals(Thread.CurrentThread, this._workers.First.Value) && this._tasks.Count > 0) // we can only claim a task if its our turn (this worker thread is the first entry in _worker queue) and there is a task available
                        {
                            task = this._tasks.First.Value;
                            this._tasks.RemoveFirst();
                            this._workers.RemoveFirst();
                            Monitor.PulseAll(this._tasks); // pulse because current (First) worker changed (so that next available sleeping worker will pick up its task)
                            break; // we found a task to process, break out from the above 'while (true)' loop
                        }
                        Monitor.Wait(this._tasks); // go to sleep, either not our turn or no task to process
                    }
                }

                task(); // process the found task
                lock(this._tasks)
                {
                    this._workers.AddLast(Thread.CurrentThread);
                }
                task = null;
            }
        }

        private readonly LinkedList<Thread> _workers; // queue of worker threads ready to process actions
        private readonly LinkedList<Action> _tasks = new LinkedList<Action>(); // actions to be processed by worker threads
        private bool _disallowAdd; // set to true when disposing queue but there are still tasks pending
        private bool _disposed; // set to true when disposing queue and no more tasks are pending
    }

    public static class Program
    {
        static void Main()
        {
            using (var pool = new Pool(5))
            {
                var random = new Random();
                Action<int> randomizer = (index =>
                {
                    Console.WriteLine("{0}: Working on index {1}", Thread.CurrentThread.Name, index);
                    Thread.Sleep(random.Next(20, 400));
                    Console.WriteLine("{0}: Ending {1}", Thread.CurrentThread.Name, index);
                });

                for (var i = 0; i < 40; ++i)
                {
                    var i1 = i;
                    pool.QueueTask(() => randomizer(i1));
                }
            }
        }
    }
}

1 votes

+1 Merci. J'utilisais ce snippet mais après une période de temps extrêmement longue, j'ai rencontré une erreur : Unhandled Exception: System.NullReferenceException: Object reference not set to an instance of an object. at System.Collections.Generic.LinkedList'1.InternalInsertNodeBe‌​fore(LinkedListNode>‌​' node, LinkedListNode'1 newNode) at System.Collections.Generic.LinkedList'1.AddLast(T value) at Prog.Pool.Worker()`. Une idée de la cause de ce problème ?

2 votes

@Legend je ne suis pas sûr de l'origine du problème, mais si je devais deviner, je dirais que c'est lié au fait que _workers La liste liée est accessible en dehors du verrou. Si vous utilisez .NET 4, vous pouvez essayer d'utiliser ConcurrentQueue<Action> à la place.

1 votes

+1 Merci. Vous avez raison. J'ai posé une question ici : stackoverflow.com/questions/16763626/ Il semble que le problème était effectivement dû à la serrure manquante. Merci pour votre temps. J'utilise actuellement .NET 3.5 et cela fonctionne à merveille.

31voto

GEOCHET Points 13787

Il n'est pas nécessaire de mettre en œuvre sa propre application, car il n'est pas très difficile d'utiliser l'application .NET existante.

De Documentation sur le ThreadPool :

using System;
using System.Threading;

public class Fibonacci
{
    public Fibonacci(int n, ManualResetEvent doneEvent)
    {
        _n = n;
        _doneEvent = doneEvent;
    }

    // Wrapper method for use with thread pool.
    public void ThreadPoolCallback(Object threadContext)
    {
        int threadIndex = (int)threadContext;
        Console.WriteLine("thread {0} started...", threadIndex);
        _fibOfN = Calculate(_n);
        Console.WriteLine("thread {0} result calculated...", threadIndex);
        _doneEvent.Set();
    }

    // Recursive method that calculates the Nth Fibonacci number.
    public int Calculate(int n)
    {
        if (n <= 1)
        {
            return n;
        }

        return Calculate(n - 1) + Calculate(n - 2);
    }

    public int N { get { return _n; } }
    private int _n;

    public int FibOfN { get { return _fibOfN; } }
    private int _fibOfN;

    private ManualResetEvent _doneEvent;
}

public class ThreadPoolExample
{
    static void Main()
    {
        const int FibonacciCalculations = 10;

        // One event is used for each Fibonacci object
        ManualResetEvent[] doneEvents = new ManualResetEvent[FibonacciCalculations];
        Fibonacci[] fibArray = new Fibonacci[FibonacciCalculations];
        Random r = new Random();

        // Configure and launch threads using ThreadPool:
        Console.WriteLine("launching {0} tasks...", FibonacciCalculations);
        for (int i = 0; i < FibonacciCalculations; i++)
        {
            doneEvents[i] = new ManualResetEvent(false);
            Fibonacci f = new Fibonacci(r.Next(20,40), doneEvents[i]);
            fibArray[i] = f;
            ThreadPool.QueueUserWorkItem(f.ThreadPoolCallback, i);
        }

        // Wait for all threads in pool to calculation...
        WaitHandle.WaitAll(doneEvents);
        Console.WriteLine("All calculations are complete.");

        // Display the results...
        for (int i= 0; i<FibonacciCalculations; i++)
        {
            Fibonacci f = fibArray[i];
            Console.WriteLine("Fibonacci({0}) = {1}", f.N, f.FibOfN);
        }
    }
}

8 votes

Le pool de threads a d'énormes limitations

15 votes

Un pool par domaine d'application, on ne peut pas essayer d'interrompre un thread en attente, etc. Il existe des tas d'informations à ce sujet. stackoverflow.com/questions/145304 codeproject.com/KB/threads/smartthreadpool.aspx codeproject.com/KB/threads/cancellablethreadpool.aspx

1 votes

@Jeffrey : Où sont ces limitations évoquées par le PO ? Où, dans le PO, voyez-vous une preuve que le PO doit créer son propre pool de fils ?

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X