76 votes

Tampon circulaire sans verrouillage

Je suis en train de concevoir un système qui se connecte à un ou plusieurs flux de données, effectue une analyse des données et déclenche des événements en fonction du résultat. Dans une configuration typique de producteur/consommateur multithread, j'aurai plusieurs threads de producteur mettant des données dans une file d'attente, et plusieurs threads de consommateur lisant les données, et les consommateurs sont seulement intéressés par le dernier point de données plus un nombre n de points. Les threads du producteur devront se bloquer si le consommateur lent ne peut pas suivre, et bien sûr les threads du consommateur se bloqueront lorsqu'il n'y aura pas de mises à jour non traitées. L'utilisation d'une file d'attente concurrente typique avec un verrou de lecteur/écrivain fonctionnera bien mais le taux de données entrant pourrait être énorme, donc je voulais réduire mes frais de verrouillage, en particulier les verrous d'écriture pour les producteurs. Je pense qu'un tampon circulaire sans verrou est ce dont j'avais besoin.

Maintenant, deux questions :

  1. Le tampon circulaire sans verrou est-il la solution ?

  2. Si c'est le cas, avant que je ne crée mon propre système, connaissez-vous une implémentation publique qui répondrait à mes besoins ?

Toute indication sur l'implémentation d'un tampon circulaire sans verrou est la bienvenue.

BTW, je fais ça en C++ sur Linux.

Quelques informations supplémentaires :

Le temps de réponse est critique pour mon système. Idéalement, les fils consommateurs voudront voir les mises à jour arriver le plus rapidement possible, car un retard supplémentaire d'une milliseconde pourrait rendre le système sans valeur, ou avec une valeur bien moindre.

L'idée de conception vers laquelle je penche est un tampon circulaire semi-libre où le thread producteur met les données dans le tampon aussi vite qu'il le peut, appelons-le la tête du tampon A, sans blocage jusqu'à ce que le tampon soit plein, lorsque A rencontre la fin du tampon Z. Les threads consommateurs détiendront chacun deux pointeurs vers le tampon circulaire, P et P n où P est la tête de mémoire tampon locale du thread et P n est le nième élément après P. Chaque thread de consommateur fera avancer ses P et P n une fois qu'il a fini de traiter le P actuel et que le pointeur de fin de tampon Z est avancé avec le P le plus lent n . Lorsque P rattrape A, ce qui signifie qu'il n'y a plus de nouvelle mise à jour à traiter, le consommateur tourne et attend que A avance à nouveau. Si le thread du consommateur tourne trop longtemps, il peut être mis en veille et attendre une variable de condition, mais je suis d'accord avec le fait que le consommateur prenne un cycle CPU en attendant la mise à jour car cela n'augmente pas ma latence (j'aurai plus de cœurs CPU que de threads). Imaginez que vous ayez une piste circulaire, et que le producteur soit devant un groupe de consommateurs, la clé est de régler le système de sorte que le producteur soit généralement juste un peu en avance sur les consommateurs, et la plupart de ces opérations peuvent être effectuées en utilisant des techniques sans verrou. Je comprends qu'il n'est pas facile d'obtenir les détails de l'implémentation correcte... ok, très difficile, c'est pourquoi je veux apprendre des erreurs des autres avant de faire les miennes.

0 votes

Je pense qu'il serait utile d'esquisser l'API que vous voulez que cette structure de données implémente.

1 votes

Une chose que j'ai apprise, c'est de prendre de gros morceaux de travail. Je ne connais pas la taille de vos éléments de travail, mais vous pouvez gagner en efficacité si vous pouvez produire de plus gros morceaux et consommer de plus gros morceaux. Vous pouvez également l'augmenter en consommant des morceaux de taille variable afin que les consommateurs ne terminent pas tous en même temps et ne se disputent pas la file d'attente des données.

0 votes

Il faut également se demander si vous avez besoin d'un seul tampon ou d'une série de tampons. Vous pourriez avoir des paires producteur/consommateur partageant un tampon, et lorsqu'un tampon est plein, le producteur ou le consommateur passe temporairement à un autre tampon ouvert. C'est une forme de vol de travail.

5voto

Akin Ocal Points 321

Je ne suis pas un expert des modèles de mémoire matérielle et des structures de données sans verrou et j'ai tendance à éviter de les utiliser dans mes projets et à opter pour des structures de données verrouillées traditionnelles.

Cependant, j'ai récemment remarqué que la vidéo : File d'attente SPSC sans verrouillage basée sur un tampon en anneau

Il s'agit d'une bibliothèque Java open source haute performance appelée LMAX distruptor utilisée par un système de trading : LMAX Distructeur

Sur la base de la présentation ci-dessus, vous rendez les pointeurs head et tail atomiques et vérifiez de manière atomique la condition où head attrape tail par derrière ou vice versa.

Vous pouvez voir ci-dessous une implémentation très basique de C++11 :

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}

4voto

Nikolai N Fetissov Points 52093

Une technique utile pour réduire la contention consiste à hacher les éléments dans plusieurs files d'attente et à faire en sorte que chaque consommateur se consacre à un "sujet".

Pour le nombre le plus récent d'éléments qui intéressent vos consommateurs, vous ne voulez pas verrouiller toute la file d'attente et l'itérer pour trouver un élément à remplacer - publiez simplement les éléments en N-tuples, c'est-à-dire tous les N éléments récents. Des points bonus pour une implémentation où le producteur bloquerait la totalité de la file d'attente (lorsque les consommateurs ne peuvent pas suivre) avec un délai d'attente, mettant à jour son cache de tuple local - de cette façon, vous ne mettez pas de pression en retour sur la source de données.

0 votes

J'ai également envisagé le modèle de threading patron/travailleur dans lequel le thread patron multidiffuse les mises à jour aux files d'attente privées des threads travailleurs. Je pense que c'est plus ou moins la direction que vous prenez. Je dois y réfléchir davantage, mais lorsque je l'ai envisagé, le modèle patron/travailleur semblait avoir trop de frais généraux car tous les travailleurs doivent recevoir les mêmes mises à jour.

1 votes

Pas exactement - ce que je veux dire dans le premier point, c'est qu'il faut découper le flux entrant pour que tous les threads ne se disputent pas le même verrou/la même file d'attente. Le deuxième point est la mise en cache du côté producteur pour s'adapter aux pics de l'entrée et aussi permettre aux consommateurs lents de ne pas arrêter le producteur.

0 votes

Mais la logique d'entreprise exige que tous les threads de travail connaissent toutes les données qui arrivent. Il n'y a qu'un seul et unique type de données entrantes et chaque point de données est d'égale importance, je ne peux donc pas vraiment découper mon flux entrant et avoir différentes données dans différentes files d'attente. Je ne peux donc pas vraiment découper mon flux entrant et avoir des données différentes dans des files d'attente différentes. La mise en place du côté producteur et le regroupement des mises à jour du modèle de données pour éviter l'accaparement ont été faits et ce n'était pas suffisant pour gérer la charge.

4voto

rama-jka toti Points 1174

La file d'attente de Sutter est sous-optimale et il le sait. The Art of Multicore programming est une excellente référence mais ne faites pas confiance aux gars de Java sur les modèles de mémoire, point final. Les liens de Ross ne vous apporteront pas de réponse définitive car ils avaient leurs bibliothèques dans tel ou tel problème, etc.

Faire de la programmation sans verrou, c'est chercher les ennuis, à moins que vous ne vouliez passer beaucoup de temps sur quelque chose que vous êtes clairement en train de sur-ingénierie avant de résoudre le problème (à en juger par la description de celui-ci, c'est une folie commune de "chercher la perfection" dans la cohérence du cache). Cela prend des années et conduit à ne pas résoudre les problèmes d'abord et à optimiser ensuite, une maladie courante.

3voto

bittnkr Points 89

Il y a quelque temps, j'ai trouvé une bonne solution à ce problème. Je crois que c'est la plus petite trouvée jusqu'à présent.

Le référentiel contient un exemple d'utilisation pour créer N threads (lecteurs et écrivains) et leur faire partager un seul siège.

J'ai fait quelques benchmarks, sur l'exemple de test et j'ai obtenu les résultats suivants (en millions d'ops/sec) :

Par taille de tampon

throughput

Par nombre de fils

enter image description here

Remarquez que le nombre de threads ne change pas le débit.

Je pense que c'est la solution ultime à ce problème. Elle fonctionne et est incroyablement rapide et simple. Même avec des centaines de threads et une file d'attente d'une seule position. Elle peut être utilisée comme un pipeline entre les threads, en allouant de l'espace à l'intérieur de la file d'attente.

Pouvez-vous le casser ?

3voto

zvrba Points 14028

Je suis d'accord avec cet article et recommandent de ne pas utiliser de structures de données sans verrou. Un article relativement récent sur les files d'attente fifo sans verrou est le suivant ce Il existe également une thèse de doctorat sur Chalmers concernant les structures de données sans verrou (j'ai perdu le lien). Cependant, vous n'avez pas dit quelle est la taille de vos éléments - les structures de données sans verrou ne fonctionnent efficacement qu'avec des éléments de la taille d'un mot, donc vous devrez allouer dynamiquement vos éléments s'ils sont plus grands qu'un mot machine (32 ou 64 bits). Si vous allouez dynamiquement les éléments, vous déplacez le goulot d'étranglement (supposé, puisque vous n'avez pas profilé votre programme et que vous faites essentiellement de l'optimisation prématurée) vers l'allocateur de mémoire, vous avez donc besoin d'un allocateur de mémoire sans verrou, par exemple, Débit des cours d'eau et l'intégrer à votre application.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X