146 votes

C++0x n'a pas de sémaphores ? Comment synchroniser les threads ?

Est-il vrai que le C++0x sera dépourvu de sémaphores ? Il y a déjà quelques questions sur Stack Overflow concernant l'utilisation des sémaphores. Je les utilise (sémaphores posix) tout le temps pour laisser un thread attendre un événement dans un autre thread :

void thread0(...)
{
  doSomething0();

  event1.wait();

  ...
}

void thread1(...)
{
  doSomething1();

  event1.post();

  ...
}

Si je faisais ça avec un mutex :

void thread0(...)
{
  doSomething0();

  event1.lock(); event1.unlock();

  ...
}

void thread1(...)
{
  event1.lock();

  doSomethingth1();

  event1.unlock();

  ...
}

Problème : c'est laid et il n'est pas garanti que le thread1 verrouille le mutex en premier (étant donné que le même thread doit verrouiller et déverrouiller un mutex, vous ne pouvez pas non plus verrouiller l'événement1 avant que le thread0 et le thread1 ne commencent).

Puisque boost n'a pas non plus de sémaphores, quel est le moyen le plus simple de réaliser ce qui précède ?

1 votes

Peut-être utiliser la condition mutex et std::promise et std::future ?

9voto

onqtam Points 2107

Vous pouvez également consulter cpp11-sur-multicore - il dispose d'une implémentation portable et optimale du sémaphore.

Le référentiel contient également d'autres fonctionnalités de threading qui complètent le threading de c++11.

8voto

Vous pouvez travailler avec des mutex et des variables de condition. Vous obtenez un accès exclusif avec le mutex, vérifiez si vous voulez continuer ou si vous devez attendre l'autre extrémité. Si vous devez attendre, vous attendez dans une condition. Lorsque l'autre thread détermine que vous pouvez continuer, il signale la condition.

Il y a un court exemple dans la bibliothèque boost::thread que vous pouvez très probablement copier (les librairies C++0x et boost thread sont très similaires).

0 votes

Conditionner les signaux uniquement aux threads en attente, ou pas ? Donc si le thread0 n'est pas là à attendre quand le thread1 le signale, il sera bloqué plus tard ? De plus, je n'ai pas besoin du verrou supplémentaire qui accompagne la condition - c'est une surcharge.

0 votes

Oui, la condition ne signale que les fils en attente. Le modèle commun est d'avoir une variable avec l'état et une condition au cas où vous devez attendre. Pensez à un producteur/consommateur, il y aura un compte sur les éléments dans le tampon, le producteur verrouille, ajoute l'élément, incrémente le compte et signale. Le consommateur se verrouille, vérifie le compteur et s'il est différent de zéro, consomme, tandis que s'il est égal à zéro, il attend dans la condition.

2 votes

Vous pouvez simuler un sémaphore de cette façon : Initialisez une variable avec la valeur que vous donneriez au sémaphore, puis wait() est traduit par "verrouiller, vérifier le compte si non nul décrémenter et continuer ; si nul attendre sur condition" tandis que post serait "verrouiller, incrémenter le compteur, signaler si c'est 0"

3voto

slasla Points 11

Peut aussi être utile comme enveloppe de sémaphore RAII dans les threads :

class ScopedSemaphore
{
public:
    explicit ScopedSemaphore(Semaphore& sem) : m_Semaphore(sem) { m_Semaphore.Wait(); }
    ScopedSemaphore(const ScopedSemaphore&) = delete;
    ~ScopedSemaphore() { m_Semaphore.Notify(); }

   ScopedSemaphore& operator=(const ScopedSemaphore&) = delete;

private:
    Semaphore& m_Semaphore;
};

Exemple d'utilisation dans une application multithread :

boost::ptr_vector<std::thread> threads;
Semaphore semaphore;

for (...)
{
    ...
    auto t = new std::thread([..., &semaphore]
    {
        ScopedSemaphore scopedSemaphore(semaphore);
        ...
    }
    );
    threads.push_back(t);
}

for (auto& t : threads)
    t.join();

2voto

Copperpot Points 197

J'ai trouvé que les shared_ptr et weak_ptr, un long avec une liste, faisaient le travail dont j'avais besoin. Mon problème était que j'avais plusieurs clients qui voulaient interagir avec les données internes d'un hôte. Typiquement, l'hôte met à jour les données par lui-même, cependant, si un client le demande, l'hôte doit arrêter la mise à jour jusqu'à ce qu'aucun client n'accède aux données de l'hôte. En même temps, un client peut demander un accès exclusif, de sorte qu'aucun autre client, ni l'hôte, ne puisse modifier les données de l'hôte.

Pour ce faire, j'ai créé une structure :

struct UpdateLock
{
    typedef std::shared_ptr< UpdateLock > ptr;
};

Chaque client aurait un membre de ce type :

UpdateLock::ptr m_myLock;

L'hôte aurait alors un membre weak_ptr pour l'exclusivité, et une liste de weak_ptrs pour les verrous non exclusifs :

std::weak_ptr< UpdateLock > m_exclusiveLock;
std::list< std::weak_ptr< UpdateLock > > m_locks;

Il existe une fonction pour activer le verrouillage, et une autre pour vérifier si l'hôte est verrouillé :

UpdateLock::ptr LockUpdate( bool exclusive );       
bool IsUpdateLocked( bool exclusive ) const;

Je teste les verrous dans LockUpdate, IsUpdateLocked, et périodiquement dans la routine de mise à jour de l'hôte. Tester un verrou est aussi simple que de vérifier si le weak_ptr a expiré, et de supprimer tout expiré de la liste m_locks (je ne le fais que pendant la mise à jour de l'hôte), je peux vérifier si la liste est vide ; en même temps, j'obtiens un déverrouillage automatique quand un client réinitialise le shared_ptr auquel il est accroché, ce qui arrive aussi quand un client est détruit automatiquement.

L'effet global est que, puisque les clients ont rarement besoin d'exclusivité (généralement réservée aux ajouts et aux suppressions), la plupart du temps, une demande de LockUpdate( false ), c'est-à-dire de non-exclusivité, aboutit tant que ( ! m_exclusiveLock). Et une demande de LockUpdate( true ), c'est-à-dire d'exclusivité, ne réussit que si à la fois ( ! m_exclusiveLock) et (m_locks.empty()).

Une file d'attente pourrait être ajoutée pour atténuer les collisions entre les verrous exclusifs et non exclusifs, mais comme je n'ai pas eu de collisions jusqu'à présent, j'ai l'intention d'attendre que cela se produise pour ajouter la solution (surtout pour avoir une condition de test réelle).

Jusqu'à présent, cela fonctionne bien pour mes besoins ; je peux imaginer la nécessité d'étendre ce système et les problèmes qui pourraient survenir en cas d'utilisation étendue, mais cela a été rapide à mettre en œuvre et a nécessité très peu de code personnalisé.

-2voto

user Points 1825

Différent des autres réponses, je propose une nouvelle version qui :

  1. Débloque tous les threads en attente avant d'être supprimé. Dans ce cas, la suppression du sémaphore réveillera tous les threads en attente et ce n'est qu'après le réveil de tous que le destructeur du sémaphore sortira.
  2. A un paramètre à la wait() pour déverrouiller automatiquement le thread appelant après l'écoulement du délai d'attente en millisecondes.
  3. A une option sur le constructeur pour limiter le nombre de ressources disponibles seulement jusqu'au nombre avec lequel le sémaphore a été initialisé. De cette façon, l'appel à notify() trop de fois n'augmentera pas le nombre de ressources du sémaphore.

    include <stdio.h>

    include <thread>

    include <mutex>

    include <condition_variable>

    include <iostream>

    std::recursive_mutex g_sync_mutex;

    define sync(x) do { \

        std::unique_lock<std::recursive_mutex> lock(g_sync_mutex); \
        x; \
    } while (false);

    class Semaphore { int _count; bool _limit; int _all_resources; int _wakedup; std::mutex _mutex; std::condition_variable_any _condition_variable;

    public: /**

    • count - how many resources this semaphore holds
    • limit - limit notify() calls only up to the count value (available resources)
      */
      Semaphore (int count, bool limit)
      _count(count), _limit(limit), _all_resources(count), _wakedup(count) { }

      /**

    • Unlock all waiting threads before destructing the semaphore (to avoid their segfalt later) */ virtual ~Semaphore () { std::unique_lock<std::mutex> lock(_mutex); _wakeup(lock); }

      void _wakeup(std::unique_lock<std::mutex>& lock) { int lastwakeup = 0;

      while( _wakedup < _all_resources ) {
          lock.unlock();
          notify();
          lock.lock();
          // avoids 100% CPU usage if someone is not waking up properly
          if (lastwakeup == _wakedup) {
              std::this_thread::sleep_for( std::chrono::milliseconds(10) );
          }
          lastwakeup = _wakedup;
      }

      }

      // Mutex and condition variables are not movable and there is no need for smart pointers yet Semaphore(const Semaphore&) = delete; Semaphore& operator =(const Semaphore&) = delete; Semaphore(const Semaphore&&) = delete; Semaphore& operator =(const Semaphore&&) = delete;

      /**

    • Release one acquired resource. */ void notify() { std::unique_lock<std::mutex> lock(_mutex); // sync(std::cerr << getTime() << "Calling notify(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl); _count++; if (_limit && _count > _all_resources) { _count = _all_resources; } _condition_variable.notify_one(); }

      /**

    • This function never blocks!
    • Return false if it would block when acquiring the lock. Otherwise acquires the lock and return true. */ bool try_acquire() { std::unique_lock<std::mutex> lock(_mutex); // sync(std::cerr << getTime() << "Calling try_acquire(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl); if(_count <= 0) { return false; } _count--; return true; }

      /**

    • Return true if the timeout expired, otherwise return false.
    • timeout - how many milliseconds to wait before automatically unlocking the wait() call. */ bool wait(int timeout = 0) { std::unique_lock<std::mutex> lock(_mutex); // sync(std::cerr << getTime() << "Calling wait(" << _count << ", " << _limit << ", " << _all_resources << ")" << std::endl); _count--; _wakedup--; try { std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();

          while(_count < 0) {
              if (timeout < 1) {
                  _condition_variable.wait(lock);
              }
              else {
                  std::cv_status status = _condition_variable.wait_until(lock, timenow + std::chrono::milliseconds(timeout));
      
                  if ( std::cv_status::timeout == status) {
                      _count++;
                      _wakedup++;
                      return true;
                  }
              }
          }
      }
      catch (...) {
          _count++;
          _wakedup++;
          throw;
      }
      _wakedup++;
      return false;

      }

      /**

    • Return true if calling wait() will block the calling thread */ bool locked() { std::unique_lock<std::mutex> lock(_mutex); return _count <= 0; }

      /**

    • Return true the semaphore has at least all resources available (since when it was created) */ bool freed() { std::unique_lock<std::mutex> lock(_mutex); return _count >= _all_resources; }

      /**

    • Return how many resources are available:
      • 0 means not free resources and calling wait() will block te calling thread
      • a negative value means there are several threads being blocked
      • a positive value means there are no threads waiting */ int count() { std::unique_lock<std::mutex> lock(_mutex); return _count; }

      /**

    • Wake everybody who is waiting and reset the semaphore to its initial value. */ void reset() { std::unique_lock<std::mutex> lock(_mutex); if(_count < 0) { _wakeup(lock); } _count = _all_resources; } };

Utilitaire permettant d'imprimer l'horodatage actuel :

std::string getTime() {
    char buffer[20];
#if defined( WIN32 )
    SYSTEMTIME wlocaltime;
    GetLocalTime(&wlocaltime);
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
    std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
    duration -= hours;
    auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
    duration -= minutes;
    auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
    duration -= seconds;
    auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
    duration -= milliseconds;
    time_t theTime = time( NULL );
    struct tm* aTime = localtime( &theTime );
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
    return buffer;
}

Exemple de programme utilisant ce sémaphore :

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
    std::cerr << getTime() << "Creating Semaphore" << std::endl;
    Semaphore* semaphore = new Semaphore(1, false);
    semaphore->wait(1000);
    semaphore->wait(1000);
    std::cerr << getTime() << "Auto Unlocking Semaphore wait" << std::endl;

    std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
    delete semaphore;

    std::cerr << getTime() << "Exiting after 10 seconds..." << std::endl;
    return 0;
}

Exemple de sortie :

11:03:01.012 Creating Semaphore
11:03:02.012 Auto Unlocking Semaphore wait
11:03:07.012 Exiting after 10 seconds...

Fonction supplémentaire qui utilise une EventLoop pour débloquer les sémaphores après un certain temps :

std::shared_ptr<std::atomic<bool>> autowait(Semaphore* semaphore, int timeout, EventLoop<std::function<void()>>& eventloop, const char* source) {
    std::shared_ptr<std::atomic<bool>> waiting(std::make_shared<std::atomic<bool>>(true));
    sync(std::cerr << getTime() << "autowait '" << source << "'..." << std::endl);

    if (semaphore->try_acquire()) {
        eventloop.enqueue( timeout, [waiting, source, semaphore]{
            if ( (*waiting).load() ) {
                sync(std::cerr << getTime() << "Timeout '" << source << "'..." << std::endl);
                semaphore->notify();
            }
        } );
    }
    else {
        semaphore->wait(timeout);
    }
    return waiting;
}

Semaphore semaphore(1, false);
EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);
std::shared_ptr<std::atomic<bool>> waiting_something = autowait(&semaphore, 45000, eventloop, "waiting_something");

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X