178 votes

Mise en commun des threads en C++11

Questions pertinentes :

À propos de C++11 :

A propos de Boost :


Comment puis-je obtenir un pool de fils a envoyer des tâches à sans les créer et les supprimer à chaque fois ? Cela signifie que les threads persistants peuvent se resynchroniser sans se joindre.


J'ai un code qui ressemble à ceci :

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

Au lieu de créer et de joindre des threads à chaque itération, je préférerais envoyer des tâches à mes threads de travail à chaque itération et ne les créer qu'une seule fois.

158voto

PhD AP EcE Points 3281

Ceci est adapté de ma réponse à un autre poste très similaire.

Construisons un ThreadPool classe :

class ThreadPool {
public:
    void Start();
    void QueueJob(const std::function<void()>& job);
    void Stop();
    void busy();

private:
    void ThreadLoop();

    bool should_terminate = false;           // Tells threads to stop looking for jobs
    std::mutex queue_mutex;                  // Prevents data races to the job queue
    std::condition_variable mutex_condition; // Allows threads to wait on new jobs or termination 
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> jobs;
};
  1. ThreadPool::Start

Pour une implémentation efficace du pool de threads, une fois que les threads sont créés conformément à num_threads il est préférable de ne pas en créer de nouveaux ou en détruire d'anciens (en les rejoignant). Il y aura une pénalité de performance, et cela pourrait même rendre votre application plus lente que la version série. application plus lente que la version série. Ainsi, nous conservons un pool de threads qui peuvent être utilisés à tout moment (s'ils ne sont pas déjà en train d'exécuter une tâche). s'ils ne sont pas déjà en train d'exécuter un travail).

Chaque thread doit exécuter sa propre boucle infinie, en attendant constamment de nouvelles tâches à saisir et à exécuter.

void ThreadPool::Start() {
    const uint32_t num_threads = std::thread::hardware_concurrency(); // Max # of threads the system supports
    threads.resize(num_threads);
    for (uint32_t i = 0; i < num_threads; i++) {
        threads.at(i) = std::thread(ThreadLoop);
    }
}
  1. ThreadPool::ThreadLoop

La fonction de boucle infinie. Il s'agit d'une while (true) boucle en attendant que la file d'attente des tâches s'ouvre.

void ThreadPool::ThreadLoop() {
    while (true) {
        std::function<void()> job;
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            mutex_condition.wait(lock, [this] {
                return !jobs.empty() || should_terminate;
            });
            if (should_terminate) {
                return;
            }
            job = jobs.front();
            jobs.pop();
        }
        job();
    }
}
  1. ThreadPool::QueueJob

Ajoutez un nouveau travail au pool ; utilisez un verrou pour éviter toute course aux données.

void ThreadPool::QueueJob(const std::function<void()>& job) {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        jobs.push(job);
    }
    mutex_condition.notify_one();
}

Pour l'utiliser :

thread_pool->QueueJob([] { /* ... */ });
  1. ThreadPool::busy

    void ThreadPool::busy() { bool poolbusy; { std::unique_lock<std::mutex> lock(queue_mutex); poolbusy = jobs.empty(); } return poolbusy; }

La fonction busy() peut être utilisée dans une boucle while, de sorte que le thread principal puisse attendre que le threadpool termine toutes les tâches avant d'appeler le destructeur du threadpool.

  1. ThreadPool::Stop

Arrêtez la piscine.

void ThreadPool::Stop() {
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        should_terminate = true;
    }
    mutex_condition.notify_all();
    for (std::thread& active_thread : threads) {
        active_thread.join();
    }
    threads.clear();
}

Une fois que vous avez intégré ces ingrédients, vous disposez de votre propre pool de threads dynamiques. Ces threads fonctionnent en permanence, attendant travail à faire.

Je m'excuse s'il y a des erreurs de syntaxe, j'ai tapé ce code et j'ai une mauvaise mémoire. Désolé de ne pas pouvoir vous fournir le code complet du pool de threads ; cela violerait l'intégrité de mon travail.

Notes :

  • Les blocs de code anonymes sont utilisés de manière à ce que, lorsqu'ils sont quittés, la fonction std::unique_lock les variables créées en leur sein sortent de la portée, débloquant le mutex.
  • ThreadPool::Stop ne mettra pas fin aux travaux en cours, il attendra simplement qu'ils se terminent via la commande active_thread.join() .

104voto

vit-vit Points 66

Vous pouvez utiliser la bibliothèque C++ Thread Pool, https://github.com/vit-vit/ctpl .

Alors le code que vous avez écrit peut être remplacé par le suivant

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

Vous obtiendrez le nombre souhaité de fils et vous n'aurez pas à les créer et à les supprimer sans cesse au fil des itérations.

72voto

Kerrek SB Points 194696

Un pool de threads signifie que tous vos threads sont en cours d'exécution, tout le temps - en d'autres termes, la fonction thread ne revient jamais. Pour donner aux threads quelque chose de significatif à faire, vous devez concevoir un système de communication inter-threads, à la fois pour indiquer au thread qu'il y a quelque chose à faire, et pour communiquer les données de travail réelles.

En général, cela implique une sorte de structure de données concurrente, et chaque thread dormirait probablement sur une sorte de variable de condition, qui serait notifiée lorsqu'il y a du travail à faire. Dès réception de la notification, un ou plusieurs des threads se réveillent, récupèrent une tâche dans la structure de données concurrente, la traitent et stockent le résultat de manière analogue.

Le fil vérifie ensuite s'il y a encore du travail à faire et, dans le cas contraire, se rendort.

Le résultat est que vous devez concevoir tout cela vous-même, car il n'existe pas de notion naturelle du "travail" qui soit universellement applicable. C'est un travail considérable, et il y a quelques problèmes subtils que vous devez régler. (Vous pouvez programmer en Go si vous aimez un système qui s'occupe de la gestion des threads pour vous en coulisse).

21voto

didierc Points 8128

Un threadpool est au fond un ensemble de threads tous liés à une fonction fonctionnant comme une boucle d'événements. Ces threads attendent sans fin l'exécution d'une tâche ou leur propre fin.

Le rôle du pool de threads est de fournir une interface permettant de soumettre des tâches, de définir (et éventuellement de modifier) la politique d'exécution de ces tâches (règles d'ordonnancement, instanciation des threads, taille du pool), et de surveiller l'état des threads et des ressources connexes.

Ainsi, pour un pool polyvalent, il faut commencer par définir ce qu'est une tâche, comment elle est lancée, interrompue, quel est le résultat (voir la notion de promesse et de futur pour cette question), à quel type d'événements les threads devront répondre, comment ils les traiteront, comment ces événements seront discriminés de ceux traités par les tâches. Comme vous pouvez le constater, cela peut devenir assez compliqué et imposer des restrictions sur la façon dont les threads vont travailler, car la solution devient de plus en plus complexe.

L'outillage actuel pour la gestion des événements est assez dépouillé(*) : des primitives comme les mutex, les variables de condition, et quelques abstractions par-dessus le marché (verrous, barrières). Mais dans certains cas, ces abstractions peuvent s'avérer inadaptées (voir cet article lié à l'article de l'OCDE sur la gestion des événements). question ), et on doit revenir à l'utilisation des primitives.

D'autres problèmes doivent également être gérés :

  • signal
  • i/o
  • le matériel (affinité des processeurs, configuration hétérogène)

Comment cela se passerait-il dans votre contexte ?

Cette réponse à une question similaire renvoie à une implémentation existante destinée à Boost et au stl.

J'ai offert un mise en œuvre très grossière d'un pool de threads pour une autre question, qui ne répond pas à de nombreux problèmes exposés ci-dessus. Vous pourriez avoir envie de vous appuyer dessus. Vous pouvez également jeter un coup d'oeil aux frameworks existants dans d'autres langues, pour trouver de l'inspiration.


(*) Je ne vois pas cela comme un problème, bien au contraire. Je pense que c'est l'esprit même du C++ hérité du C.

12voto

pio Points 95
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

function_pool.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

main.cpp

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X