39 votes

Ordre atomique de la mémoire en C++11 - est-ce une utilisation correcte de l'ordre relaxé (release-consume) ?

J'ai récemment réalisé un portage vers C++11 en utilisant std::atomic d'un triple tampon à utiliser comme mécanisme de synchronisation de la concurrence. L'idée derrière cette approche de synchronisation des threads est que pour une situation producteur-consommateur où vous avez un producteur qui exécute plus rapide que le consommateur, le triple buffering peut présenter certains avantages puisque le thread producteur ne sera pas "ralenti" par l'attente du consommateur. Dans mon cas, j'ai un thread physique qui est mis à jour à ~120fps, et un thread de rendu qui fonctionne à ~60fps. Évidemment, je veux que le fil de rendu obtienne toujours l'état le plus récent possible, mais je sais aussi que je vais sauter beaucoup d'images du fil physique, en raison de la différence de taux. D'un autre côté, je veux que mon fil physique maintienne son taux de mise à jour constant et ne soit pas limité par le fil de rendu plus lent qui verrouille mes données.

Le code C original a été fait par remis-thoughts et l'explication complète est dans son blog . J'encourage toute personne intéressée à le lire pour mieux comprendre la mise en œuvre originale.

Ma mise en œuvre peut être trouvée aquí .

L'idée de base est d'avoir un tableau avec 3 positions (buffers) et un drapeau atomique qui est comparé et échangé pour définir quels éléments du tableau correspondent à quel état, à tout moment donné. De cette façon, une seule variable atomique est utilisée pour modéliser les 3 index du tableau et la logique derrière la triple mise en mémoire tampon. Les 3 positions du tampon sont nommées Dirty, Clean et Snap. Le site producteur écrit toujours dans l'index Dirty, et peut retourner l'auteur pour échanger l'index Dirty avec l'index Clean actuel. Le site consommateur peut demander un nouveau Snap, qui échange l'index Snap actuel avec l'index Clean pour obtenir le tampon le plus récent. Le site consommateur lit toujours le tampon dans la position Snap.

Le drapeau consiste en un int non signé de 8 bits et les bits correspondent à :

(inutilisé) (nouvelle écriture) (2x sale) (2x propre) (2x snap)

L'indicateur de bit supplémentaire newWrite est placé par le rédacteur et effacé par le lecteur. Le lecteur peut l'utiliser pour vérifier s'il y a eu des écritures depuis le dernier snap, et si non, il ne prendra pas un autre snap. L'indicateur et les index peuvent être obtenus en utilisant des opérations simples de type bit à bit.

Ok maintenant pour le code :

template <typename T>
class TripleBuffer
{

public:

  TripleBuffer<T>();
  TripleBuffer<T>(const T& init);

  // non-copyable behavior
  TripleBuffer<T>(const TripleBuffer<T>&) = delete;
  TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;

  T snap() const; // get the current snap to read
  void write(const T newT); // write a new value
  bool newSnap(); // swap to the latest value, if any
  void flipWriter(); // flip writer positions dirty / clean

  T readLast(); // wrapper to read the last available element (newSnap + snap)
  void update(T newT); // wrapper to update with a new element (write + flipWriter)

private:

  bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
  uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
  uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes

  // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
  // newWrite   = (flags & 0x40)
  // dirtyIndex = (flags & 0x30) >> 4
  // cleanIndex = (flags & 0xC) >> 2
  // snapIndex  = (flags & 0x3)
  mutable atomic_uint_fast8_t flags;

  T buffer[3];
};

mise en œuvre :

template <typename T>
TripleBuffer<T>::TripleBuffer(){

  T dummy = T();

  buffer[0] = dummy;
  buffer[1] = dummy;
  buffer[2] = dummy;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){

  buffer[0] = init;
  buffer[1] = init;
  buffer[2] = init;

  flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}

template <typename T>
T TripleBuffer<T>::snap() const{

  return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}

template <typename T>
void TripleBuffer<T>::write(const T newT){

  buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}

template <typename T>
bool TripleBuffer<T>::newSnap(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  do {
    if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
      return false;
  } while(!flags.compare_exchange_weak(flagsNow,
                                       swapSnapWithClean(flagsNow),
                                       memory_order_release,
                                       memory_order_consume));
  return true;
}

template <typename T>
void TripleBuffer<T>::flipWriter(){

  uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
  while(!flags.compare_exchange_weak(flagsNow,
                                     newWriteSwapCleanWithDirty(flagsNow),
                                     memory_order_release,
                                     memory_order_consume));
}

template <typename T>
T TripleBuffer<T>::readLast(){
    newSnap(); // get most recent value
    return snap(); // return it
}

template <typename T>
void TripleBuffer<T>::update(T newT){
    write(newT); // write new value
    flipWriter(); // change dirty/clean buffer positions for the next update
}

template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
    // check if the newWrite bit is 1
    return ((flags & 0x40) != 0);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
    // swap snap with clean
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}

template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
    // set newWrite bit to 1 and swap clean with dirty 
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}

Comme vous pouvez le constater, j'ai décidé d'utiliser une Libérer-Consommer pour l'ordonnancement de la mémoire. Le site Communiqué de presse (memory_order_release) pour le magasin assure qu'aucune écriture dans le thread actuel ne peut être réordonnée. après le magasin. De l'autre côté, le Consommez assure qu'aucune lecture dans le thread actuel dépendant de la valeur actuellement chargée ne peut être réordonnée. avant cette charge. Cela garantit que les écritures sur les variables dépendantes dans d'autres threads qui libèrent la même variable atomique sont visibles dans le thread actuel.

Si je comprends bien, puisque je n'ai besoin que des drapeaux pour être atomiquement fixés, les opérations sur les autres variables qui n'affectent pas directement les drapeaux peuvent être réordonnées librement par le compilateur, ce qui permet davantage d'optimisations. En lisant certains documents sur le nouveau modèle de mémoire, je suis également conscient que ces atomiques relaxés n'auront un effet notable que sur des plateformes telles que ARM et POWER (ils ont été introduits principalement à cause d'elles). Puisque je vise ARM, je pense que je pourrais bénéficier de ces opérations et être capable d'extraire un peu plus de performance.

Maintenant, la question :

Est-ce que j'utilise correctement l'ordre détendu Release-Consume pour ce problème spécifique ?

Merci,

André

PS : Désolé pour ce long post, mais j'ai pensé qu'un contexte décent était nécessaire pour une meilleure vision du problème.

EDIT : Implémentation des suggestions de @Yakk :

  • Correction de flags continuer à lire newSnap() y flipWriter() qui utilisaient l'affectation directe, donc l'affectation par défaut load(std::memory_order_seq_cst) .
  • Déplacement des opérations de bidouillage de bits vers des fonctions dédiées pour plus de clarté.
  • Ajouté bool le type de retour à newSnap() retourne maintenant false quand il n'y a rien de nouveau et true sinon.
  • Défini la classe comme non copiable en utilisant = delete puisque les constructeurs de copie et d'assignation n'étaient pas sûrs si l'élément TripleBuffer était utilisé.

EDIT 2 : Correction de la description, qui était incorrecte (Merci @Useless). Il s'agit du consommateur qui demande un nouveau Snap et lit à partir de l'index du Snap (pas le "rédacteur"). Désolé pour la distraction et merci à Useless de l'avoir signalé.

EDIT 3 : Optimisé le newSnap() y flipriter() selon les suggestions de @Display Name, supprimant ainsi 2 fonctions redondantes. load() par cycle de boucle.

3voto

Display Name Points 495

Pourquoi chargez-vous deux fois l'ancienne valeur des drapeaux dans vos boucles CAS ? La première fois, c'est par flags.load() et le second par le compare_exchange_weak() qui, selon la norme, en cas d'échec du CAS, chargera la valeur précédente dans le premier argument, qui dans ce cas est flagsNow.

Selon http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange , " Sinon, charge la valeur réelle stockée dans *this dans expected (effectue l'opération de chargement). " Donc ce que votre boucle fait, c'est qu'en cas d'échec, compare_exchange_weak() rechargements flagsNow puis la boucle se répète, et la première instruction le charge une nouvelle fois, immédiatement après le chargement par compare_exchange_weak() . Il me semble que votre boucle devrait plutôt avoir la charge tirée à l'extérieur de la boucle. Par exemple, newSnap() serait :

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
    if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));

y flipWriter() :

uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));

1voto

Oui, il y a une différence entre memory_order_acquire et memory_order_consume, mais vous ne la remarquerez pas si vous l'utilisez à environ 180 par seconde. Vous pouvez exécuter mon test avec m2 = memory_order_consume si vous voulez connaître la réponse en chiffres. Il suffit de changer producer_or_consumer_Thread par quelque chose comme ça :

TripleBuffer <int> tb;

void producer_or_consumer_Thread(void *arg)
{
    struct Arg * a = (struct Arg *) arg;
    bool succeeded = false;
    int i = 0, k, kold = -1, kcur;

    while (a->run)
    {
        while (a->wait) a->is_waiting = true; // busy wait
        if (a->producer)
        {
            i++;
            tb.update(i);
            a->counter[0]++;
        }
        else
        {
            kcur = tb.snap();
            if (kold != -1 && kcur != kold) a->counter[1]++;
            succeeded = tb0.newSnap();
            if (succeeded)
            {
                k = tb.readLast();
                if (kold == -1)
                    kold = k;
                else if (kold = k + 1)
                    kold = k;
                else
                    succeeded = false;
            }
            if (succeeded) a->counter[0]++;   
        }
    }
    a->is_waiting =  true;
}

Résultat du test :

_#_  __Produced __Consumed _____Total
  1    39258150   19509292   58767442
  2    24598892   14730385   39329277
  3    10615129   10016276   20631405
  4    10617349   10026637   20643986
  5    10600334    9976625   20576959
  6    10624009   10069984   20693993
  7    10609040   10016174   20625214
  8    25864915   15136263   41001178
  9    39847163   19809974   59657137
 10    29981232   16139823   46121055
 11    10555174    9870567   20425741
 12    25975381   15171559   41146940
 13    24311523   14490089   38801612
 14    10512252    9686540   20198792
 15    10520211    9693305   20213516
 16    10523458    9720930   20244388
 17    10576840    9917756   20494596
 18    11048180    9528808   20576988
 19    11500654    9530853   21031507
 20    11264789    9746040   21010829

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X