83 votes

Comment implémenter un tampon circulaire en C ?

J'ai besoin d'un tampon circulaire de taille fixe (sélectionnable à l'exécution lors de sa création, et non à la compilation) pouvant contenir des objets de n'importe quel type et devant être très haute performance. Je ne pense pas qu'il y aura des problèmes de contention des ressources car, bien qu'il s'agisse d'un environnement multi-tâches embarqué, il s'agit d'un environnement coopératif, de sorte que les tâches elles-mêmes peuvent gérer cela.

Mon idée initiale était de stocker un simple struct dans le tampon qui contiendrait le type (simple enum/define) et un pointeur void vers la charge utile mais je veux que ce soit aussi rapide que possible donc je suis ouvert aux suggestions qui impliquent de contourner le tas.

En fait, je suis heureux de contourner toute la bibliothèque standard pour la vitesse brute - de ce que j'ai vu du code, il n'est pas fortement optimisé pour le CPU : on dirait qu'ils ont juste compilé du code C pour des choses telles que strcpy() et autres, il n'y a pas d'assemblage codé à la main.

Tout code ou idée serait grandement apprécié. Les opérations requises sont :

  • créer un tampon de taille spécifique.
  • mis à la queue.
  • de la tête.
  • retourner le compte.
  • supprimer un tampon.

1 votes

Avez-vous besoin d'un tampon circulaire ou d'une file d'attente ? Les opérations requises font penser à une file d'attente. J'admets qu'avec l'exigence d'une taille fixe, l'utilisation d'un tampon circulaire est logique, mais je ne suis pas sûr que le titre de la question reflète votre question réelle.

1 votes

Je suis ouvert à d'autres structures de données si vous pensez qu'elles peuvent être plus rapides, mais je suis raisonnablement certain qu'un tampon circulaire fixe en mémoire sera plus performant que malloc/free des éléments de la file d'attente. Bien que je suppose que je dois faire malloc/free de la charge utile de toute façon : si je pouvais faire un seul malloc pour l'élément et la charge utile, ça pourrait valoir le coup.

0 votes

"si vous pensez qu'ils peuvent être plus rapides" ? - Je pense qu'il faut faire des tests. Au fait, qu'entendez-vous par "très hautes performances" ?

94voto

Adam Rosenfield Points 176408

La solution la plus simple serait de garder trace de la taille de l'élément et du nombre d'éléments, puis de créer un tampon du nombre d'octets approprié :

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}

5 votes

Solution très standard - exactement conforme aux spécifications que le PO a incluses comme ce qu'il essayait d'éviter :P

0 votes

Adam, cette solution suppose que les éléments de même taille doivent toujours occuper la même position dans le tampon. Je crois que le PO a demandé que toute entrée donnée, stockée à n'importe quel endroit dans le tampon, soit l'un des 6 types de données de taille différente. Cela laisse 2 alternatives. Utiliser calloc() et realloc() pour chaque donnée nouvellement arrivée, ou allouer un tampon suffisamment grand pour contenir le plus grand type de donnée à chaque position dans le tampon. Cette dernière approche, si elle est réalisable, serait plus rapide et plus propre.

0 votes

Pourriez-vous fournir un moyen d'initier un tampon circulaire ? En utilisant ce code : circular_buffer *cb ; cb_init(cb, 20, 4) ; j'obtiens cette erreur Le processus s'est terminé avec le code de sortie 139 (interrompu par le signal 11 : SIGSEGV) Merci d'avance et désolé d'écrire du code dans les commentaires.

16voto

alexbw Points 1027
// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}

Tant que la longueur de votre tampon circulaire est une puissance de deux, l'opération binaire "&", incroyablement rapide, fera le tour de votre index pour vous. Dans mon application, j'affiche un segment d'audio à l'utilisateur à partir d'un tampon circulaire d'audio acquis à partir d'un microphone.

Je m'assure toujours que la quantité maximale d'audio pouvant être affichée à l'écran est bien inférieure à la taille du tampon circulaire. Sinon, vous risquez de lire et d'écrire dans le même chunk. Cela donnerait probablement des artefacts d'affichage bizarres.

0 votes

J'aime l'utilisation du & et l'utilisation de ringBuffer->sizeOfBuffer comme masque de bits. Je suppose que le problème des "artefacts d'affichage bizarres" est dû au fait que l'on écrit dans la FIFO sans vérifier si l'on va écrire sur la queue avant d'effectuer l'écriture ?

1 votes

J'ai fait quelques tests, et je pense que les deux façons d'utiliser % et & sont les mêmes avec cet extrait : uint8_t tmp1,tmp2; tmp1 = (34 + 1) & 31; tmp2 = (35 ) % 32; printf("%d %d",tmp1,tmp2); Alors, quelle est la différence réelle ou c'est juste un style de codage ?

12voto

dewtell Points 721

Pouvez-vous énumérer les types nécessaires au moment où vous codez le tampon, ou devez-vous être capable d'ajouter des types au moment de l'exécution via des appels dynamiques ? Dans le premier cas, je créerais le tampon sous la forme d'un tableau de n structures alloué au tas, où chaque structure est constituée de deux éléments : une balise d'énumération identifiant le type de données, et une union de tous les types de données. Ce que vous perdez en termes de stockage supplémentaire pour les petits éléments, vous le compensez en n'ayant pas à gérer l'allocation/désallocation et la fragmentation de la mémoire qui en résulte. Il suffit ensuite de garder la trace des indices de début et de fin qui définissent les éléments de tête et de queue du tampon, et de s'assurer de calculer le mod n lors de l'incrémentation/décrémentation des indices.

0 votes

Les types nécessaires sont énumérés, oui. Il n'y en a que six environ et cela ne changera jamais. Mais les files d'attente qui contiennent les éléments doivent être capables de contenir les six types. Je ne suis pas sûr du stockage d'un élément entier dans la file d'attente plutôt qu'un pointeur - cela signifie copier des éléments (plusieurs centaines d'octets) plutôt qu'un pointeur. Mais j'aime l'idée d'une union - ma principale préoccupation ici est la vitesse plutôt que la mémoire (nous avons suffisamment de cette dernière, mais l'unité centrale est un choc :-).

0 votes

Ta réponse m'a donné une bonne idée : la mémoire des éléments pourrait être pré-allouée par malloc() et distribuée par un mymalloc() spécialement conçu pour gérer ces blocs de mémoire. Et je pourrais toujours utiliser des pointeurs. +1 pour ça.

0 votes

Vous pouvez avoir besoin ou non de faire des copies supplémentaires, en fonction du modèle d'accès aux données. Si vous pouviez construire les éléments sur place, et les référencer alors qu'ils sont encore dans le tampon avant de le vider, il n'y aurait peut-être pas de copie supplémentaire. Mais il est certainement plus sûr et plus flexible de les distribuer à partir de votre propre allocateur, et d'utiliser un tableau séparé de pointeurs (ou d'indices) comme tampon.

10voto

RocketRoy Points 444

D'abord, le titre. Vous n'avez pas besoin de l'arithmétique modulo pour envelopper le tampon si vous utilisez des bits ints pour contenir les "pointeurs" de tête et de queue, et si vous les dimensionnez de manière à ce qu'ils soient parfaitement synchronisés. IE : 4096 rempli dans un 12-bit unsigned int est 0 tout seul, non molesté en aucune façon. L'élimination de l'arithmétique modulo, même pour les puissances de 2, double la vitesse - presque exactement.

10 millions d'itérations de remplissage et de vidange d'un tampon de 4096 éléments de données de tout type prennent 52 secondes sur mon Dell XPS 8500 de 3e génération i7 en utilisant le compilateur C++ de Visual Studio 2010 avec l'inlining par défaut, et 1/8192e de ce temps pour servir une donnée.

Je réécrirais les boucles de test dans main() pour qu'elles ne contrôlent plus le flux - qui est, et devrait être, contrôlé par les valeurs de retour indiquant que le tampon est plein ou vide, et les instructions break ; correspondantes. IE : le remplisseur et le vidangeur devraient pouvoir se cogner l'un contre l'autre sans corruption ou instabilité. A un moment donné, j'espère pouvoir utiliser ce code en multithread, ce qui rendra ce comportement crucial.

Le QUEUE_DESC (descripteur de file d'attente) et la fonction d'initialisation obligent tous les tampons dans ce code à être une puissance de 2. Le schéma ci-dessus ne fonctionnera PAS autrement. A ce propos, notez que QUEUE_DESC n'est pas codé en dur, il utilise une constante manifeste (#define BITS_ELE_KNT) pour sa construction. (Je suppose qu'une puissance de 2 est une flexibilité suffisante ici).

Pour rendre la taille du tampon sélectionnable en cours d'exécution, j'ai essayé différentes approches (non montrées ici), et me suis contenté d'utiliser des USHRT pour Head, Tail, EleKnt capables de gérer un tampon FIFO [USHRT]. Pour éviter l'arithmétique modulo, j'ai créé un masque pour && avec Head, Tail, mais ce masque s'avère être (EleKnt -1), donc utilisez simplement cela. L'utilisation de USHRTS au lieu de bit ints a augmenté les performances de ~ 15% sur une machine silencieuse. Les cœurs des CPU Intel ont toujours été plus rapides que leurs bus, donc sur une machine occupée et partagée, l'empaquetage de vos structures de données vous permet d'être chargé et d'exécuter avant les autres threads en compétition. Compromis.

Notez que le stockage réel du tampon est alloué sur le tas avec calloc(), et le pointeur est à la base de la structure, donc la structure et le pointeur ont EXACTEMENT la même adresse. IE ; aucun offset ne doit être ajouté à l'adresse de la structure pour bloquer les registres.

Dans le même ordre d'idées, toutes les variables liées à la gestion du tampon sont physiquement adjacentes au tampon, liées à la même structure, de sorte que le compilateur peut créer un magnifique langage d'assemblage. Il faut tuer l'optimisation inline pour voir de l'assembleur, car sinon il est écrasé dans l'oubli.

Pour supporter le polymorphisme de tout type de données, j'ai utilisé memcpy() au lieu des assignations. Si vous avez seulement besoin de la flexibilité de supporter un type de variable aléatoire par compilation, alors ce code fonctionne parfaitement.

Pour le polymorphisme, il suffit de connaître le type et son besoin de stockage. Le tableau de descripteurs DATA_DESC permet de garder la trace de chaque donnée placée dans QUEUE_DESC.pBuffer afin de pouvoir la récupérer correctement. Je me contenterais d'allouer suffisamment de mémoire pBuffer pour contenir tous les éléments du plus grand type de données, mais je garderais la trace de la quantité de mémoire qu'une donnée donnée utilise réellement dans DATA_DESC.dBytes. L'alternative est de réinventer un gestionnaire de tas.

Cela signifie que le UCHAR *pBuffer de QUEUE_DESC aurait un tableau compagnon parallèle pour garder la trace du type et de la taille des données, tandis que l'emplacement de stockage d'une donnée dans pBuffer resterait tel qu'il est actuellement. Le nouveau membre serait quelque chose comme DATA_DESC *pDataDesc, ou, peut-être, DATA_DESC DataDesc[2^BITS_ELE_KNT] si vous pouvez trouver un moyen de battre votre compilateur en soumission avec une telle référence directe. Calloc() est toujours plus flexible dans ces situations.

Vous auriez toujours memcpy() dans Q_Put(),Q_Get, mais le nombre d'octets réellement copiés serait déterminé par DATA_DESC.dBytes, et non par QUEUE_DESC.EleBytes. Les éléments sont potentiellement tous de types/tailles différents pour tout put ou get donné.

Je pense que ce code répond aux exigences de vitesse et de taille de la mémoire tampon, et qu'il peut être adapté pour répondre à l'exigence de 6 types de données différents. J'ai laissé les nombreux tests, sous forme d'instructions printf(), afin que vous puissiez vous assurer (ou non) que le code fonctionne correctement. Le générateur de nombres aléatoires démontre que le code fonctionne pour n'importe quelle combinaison tête/queue aléatoire.

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}

1 votes

Il existe une approche encore plus rapide pour enrouler le vecteur plat en un anneau. Utilisez une valeur sentinelle à la fin du tampon, et lorsque pHead, ou pTail sont == ce pointeur, réinitialisez chacun d'eux, à tour de rôle, vers buff[0]. Environ 70% du temps pour les puissances-de-deux, et 33% du temps pour les non puissances-de-deux. Si le temps le permet, je vais mettre un peu de code.

7 votes

Votre code est cassé pour plusieurs raisons. Premièrement, la comparaison Q->Tail == (Q->Head + Q->EleKnt) dans votre Q_Put ne retournera jamais vrai puisque Q->Head + Q->EleKnt n'est pas une addition modulo, ce qui signifie que vous allez simplement écraser la tête lors de la prochaine écriture. Si EleKnt es 4096 c'est une valeur que votre Tail ne sera jamais atteint. Ensuite, l'utilisation de cette file d'attente comme file d'attente de producteur/consommateur provoquerait des ravages, car vos Q_Put "écrit d'abord, pose des questions ensuite" et met à jour le site de l Tail même après avoir réalisé que la file d'attente est pleine. L'appel suivant à Q_Put va simplement écraser l'en-tête comme si rien ne s'était passé.

2 votes

Vous devez analyser les différents algorithmes présentés sur la page Wikipedia de Tampon circulaire c'est-à-dire le distinction entre tampon plein ou vide problème. Avec votre approche actuelle, vous devez garder un élément de moins dans votre tampon pour savoir la différence entre plein et vide.

9voto

SoloPilot Points 319

Voici une solution simple en C. Supposez que les interruptions soient désactivées pour chaque fonction. Pas de polymorphisme et autres, juste du bon sens.


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}

1 votes

PIn > pEnd doit être utilisé au lieu de pIn >= pEnd sinon vous ne remplirez jamais le dernier emplacement du buf ; il en va de même pour pOut >= pEnd

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X