12 votes

Optimisation de la conversion C++

J'aimerais savoir s'il existe un moyen plus rapide d'effectuer ma conversion audio que de parcourir toutes les valeurs une par une et de les diviser par 32768.

void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
    for ( int i = 0; i <=uIntegers.size()-1;i++)
    {
        uDoubles[i] = uIntegers[i] / 32768.0;
    }
}

Mon approche fonctionne bien, mais elle pourrait être plus rapide. Cependant, je n'ai pas trouvé de moyen de l'accélérer.

Merci pour votre aide !

3voto

GuyGreer Points 4240

Si votre tableau est assez grand, il peut être intéressant de paralléliser cette boucle for. La méthode OpenMP parallèle pour est ce que j'utiliserais.
La fonction serait alors :

    void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
    {
        #pragma omp parallel for
        for (int i = 0; i < uIntegers.size(); i++)
        {
            uDoubles[i] = uIntegers[i] / 32768.0;
        }
    }

avec gcc, vous devez passer -fopenmp lorsque vous compilez pour le pragma à utiliser, sous MSVC c'est /openmp . Étant donné que la création de threads entraîne une surcharge notable, cette méthode ne sera plus rapide que si vous traitez des tableaux de grande taille, YMMV.

3voto

Adam Points 654

Pour une vitesse maximale, il est préférable de convertir plus d'une valeur par itération de la boucle. Le moyen le plus simple d'y parvenir est d'utiliser la technologie SIMD. Voici en gros comment vous le feriez avec SSE2 :

void CAudioDataItem::Convert(const vector<int>&uIntegers, vector<double> &uDoubles)
{
    __m128d scale = _mm_set_pd( 1.0 / 32768.0, 1.0 / 32768.0 );
    int i = 0;
    for ( ; i < uIntegers.size() - 3; i += 4)
    {
        __m128i x = _mm_loadu_si128(&uIntegers[i]);
        __m128i y = _mm_shuffle_epi32(x, _MM_SHUFFLE(2,3,0,0) );
        __m128d dx = _mm_cvtepi32_pd(x);
        __m128d dy = _mm_cvtepi32_pd(y);
        dx = _mm_mul_pd(dx, scale);
        dy = _mm_mul_pd(dy, scale);
        _mm_storeu_pd(dx, &uDoubles[i]);
        _mm_storeu_pd(dy, &uDoubles[i + 2]);
    }
    // Finish off the last 0-3 elements the slow way
    for ( ; i < uIntegers.size(); i ++)
    {
        uDoubles[i] = uIntegers[i] / 32768.0;
    }
}

Nous traitons quatre entiers par itération de la boucle. Comme nous ne pouvons faire tenir que deux doubles dans les registres, il y a un peu de travail en double, mais le déroulement supplémentaire améliorera les performances, sauf si les tableaux sont minuscules.

Le fait de remplacer les types de données par des types plus petits (par exemple short et float) devrait également améliorer les performances, car ils réduisent la bande passante de la mémoire et vous pouvez faire tenir quatre floats dans un registre SSE. Pour les données audio, vous ne devriez pas avoir besoin de la précision d'un double.

Notez que j'ai utilisé des chargements et des stockages non alignés. Les chargements et stockages alignés seront légèrement plus rapides si les données sont réellement alignées (ce qui n'est pas le cas par défaut, et il est difficile de faire en sorte que les données soient alignées à l'intérieur d'un std::vector).

3voto

Votre fonction est hautement parallélisable. Sur les CPU Intel modernes, il existe trois façons indépendantes de paralléliser : Le parallélisme au niveau des instructions (ILP), le parallélisme au niveau des threads (TLP) et le SIMD. J'ai pu utiliser les trois pour obtenir de gros gains dans votre fonction. Les résultats dépendent cependant du compilateur. Le boost est bien moindre en utilisant GCC puisqu'il vectorise déjà la fonction. Voir le tableau des chiffres ci-dessous.

Cependant, le principal facteur limitant de votre fonction est que sa complexité temporelle est seulement O(n) et donc il y a une chute drastique de l'efficacité lorsque la taille de la matrice sur laquelle vous vous déplacez franchit la limite de chaque niveau de cache. . Si vous regardez par exemple la multiplication de matrices denses de grande taille (GEMM), il s'agit d'une opération O(n^3) et si l'on fait les choses correctement (en utilisant par exemple la mise en mosaïque de boucles), la hiérarchie du cache n'est pas un problème : vous pouvez vous approcher du maximum de flops/s même pour de très grandes matrices (ce qui semble indiquer que GEMM est l'une des pensées d'Intel lorsqu'ils conçoivent le CPU). La façon de résoudre ce problème dans votre cas est de trouver un moyen de faire votre fonction sur un bloc de cache L1 juste après/avant de faire une opération plus complexe (par exemple qui va en O(n^2)) et ensuite de passer à un autre bloc L1. Bien sûr, je ne sais pas ce que vous faites donc je ne peux pas le faire.

L'ILP est partiellement réalisée pour vous par le matériel du CPU. Cependant, les dépendances de boucles portées limitent souvent l'ILP, il est donc utile de dérouler les boucles pour profiter pleinement de l'ILP. Pour TLP j'utilise OpenMP, et pour SIMD j'utilise AVX (cependant le code ci-dessous fonctionne aussi pour SSE). J'ai utilisé une mémoire alignée sur 32 octets et je me suis assuré que le tableau était un multiple de 8 afin qu'aucun nettoyage ne soit nécessaire.

Voici les résultats de Visual Studio 2012 64bit avec AVX et OpenMP (mode release évidemment) SandyBridge EP 4 cœurs (8 HW threads) @3.6 GHz. La variable n est le nombre d'articles. Je répète également la fonction plusieurs fois, ce qui est inclus dans le temps total. La fonction convert_vec4_unroll2_openmp donne les meilleurs résultats, sauf dans la région L1. Vous pouvez également voir clairement que l'efficacité diminue de manière significative chaque fois que vous passez à un nouveau niveau de cache, mais même pour la mémoire principale, elle est toujours meilleure.

l1 chache, n 2752, repeat 300000
    covert time 1.34, error 0.000000
    convert_vec4 time 0.16, error 0.000000
    convert_vec4_unroll2 time 0.16, error 0.000000
    convert_vec4_unroll2_openmp time 0.31, error 0.000000

l2 chache, n 21856, repeat 30000
    covert time 1.14, error 0.000000
    convert_vec4 time 0.24, error 0.000000
    convert_vec4_unroll2 time 0.24, error 0.000000
    convert_vec4_unroll2_openmp time 0.12, error 0.000000

l3 chache, n 699072, repeat 1000
    covert time 1.23, error 0.000000
    convert_vec4 time 0.44, error 0.000000
    convert_vec4_unroll2 time 0.45, error 0.000000
    convert_vec4_unroll2_openmp time 0.14, error 0.000000

main memory , n 8738144, repeat 100
    covert time 1.56, error 0.000000
    convert_vec4 time 0.95, error 0.000000
    convert_vec4_unroll2 time 0.89, error 0.000000
    convert_vec4_unroll2_openmp time 0.51, error 0.000000

Résultats avec g++ foo.cpp -mavx -fopenmp -ffast-math -O3 sur un i5-3317 (ivy bridge) @ 2.4 GHz 2 cœurs (4 HW threads). GCC semble vectoriser cela et le seul avantage vient d'OpenMP (qui, cependant, donne un résultat pire dans la région L1).

l1 chache, n 2752, repeat 300000
    covert time 0.26, error 0.000000
    convert_vec4 time 0.22, error 0.000000
    convert_vec4_unroll2 time 0.21, error 0.000000
    convert_vec4_unroll2_openmp time 0.46, error 0.000000

l2 chache, n 21856, repeat 30000
    covert time 0.28, error 0.000000
    convert_vec4 time 0.27, error 0.000000
    convert_vec4_unroll2 time 0.27, error 0.000000
    convert_vec4_unroll2_openmp time 0.20, error 0.000000

l3 chache, n 699072, repeat 1000
    covert time 0.80, error 0.000000
    convert_vec4 time 0.80, error 0.000000
    convert_vec4_unroll2 time 0.80, error 0.000000
    convert_vec4_unroll2_openmp time 0.83, error 0.000000

main memory chache, n 8738144, repeat 100
    covert time 1.10, error 0.000000
    convert_vec4 time 1.10, error 0.000000
    convert_vec4_unroll2 time 1.10, error 0.000000
    convert_vec4_unroll2_openmp time 1.00, error 0.000000

Voici le code. J'utilise la classe vectorielle http://www.agner.org/optimize/vectorclass.zip pour faire du SIMD. Cela utilisera soit AVX pour écrire 4 doubles à la fois, soit SSE pour écrire 2 doubles à la fois.

#include <stdlib.h>
#include <stdio.h>
#include <omp.h>
#include "vectorclass.h"

void convert(const int *uIntegers, double *uDoubles, const int n) {
    for ( int i = 0; i<n; i++) {
        uDoubles[i] = uIntegers[i] / 32768.0;
    }
}

void convert_vec4(const int *uIntegers, double *uDoubles, const int n) {
    Vec4d div = 1.0/32768;
    for ( int i = 0; i<n; i+=4) {
        Vec4i u4i = Vec4i().load(&uIntegers[i]);
        Vec4d u4d  = to_double(u4i);
        u4d*=div;
        u4d.store(&uDoubles[i]);
    }
}

void convert_vec4_unroll2(const int *uIntegers, double *uDoubles, const int n) {
    Vec4d div = 1.0/32768;
    for ( int i = 0; i<n; i+=8) {
        Vec4i u4i_v1 = Vec4i().load(&uIntegers[i]);
        Vec4d u4d_v1  = to_double(u4i_v1);
        u4d_v1*=div;
        u4d_v1.store(&uDoubles[i]);

        Vec4i u4i_v2 = Vec4i().load(&uIntegers[i+4]);
        Vec4d u4d_v2  = to_double(u4i_v2);
        u4d_v2*=div;
        u4d_v2.store(&uDoubles[i+4]);
    }
}

void convert_vec4_openmp(const int *uIntegers, double *uDoubles, const int n) {
    #pragma omp parallel for    
    for ( int i = 0; i<n; i+=4) {
        Vec4i u4i = Vec4i().load(&uIntegers[i]);
        Vec4d u4d  = to_double(u4i);
        u4d/=32768.0;
        u4d.store(&uDoubles[i]);
    }
}

void convert_vec4_unroll2_openmp(const int *uIntegers, double *uDoubles, const int n) {
    Vec4d div = 1.0/32768;
    #pragma omp parallel for    
    for ( int i = 0; i<n; i+=8) {
        Vec4i u4i_v1 = Vec4i().load(&uIntegers[i]);
        Vec4d u4d_v1  = to_double(u4i_v1);
        u4d_v1*=div;
        u4d_v1.store(&uDoubles[i]);

        Vec4i u4i_v2 = Vec4i().load(&uIntegers[i+4]);
        Vec4d u4d_v2  = to_double(u4i_v2);
        u4d_v2*=div;
        u4d_v2.store(&uDoubles[i+4]);
    }
}

double compare(double *a, double *b, const int n) {
    double diff = 0.0;
    for(int i=0; i<n; i++) {
        double tmp = a[i] - b[i];
        //printf("%d %f %f \n", i, a[i], b[i]);
        if(tmp<0) tmp*=-1;
        diff += tmp;
    }
    return diff;
}

void loop(const int n, const int repeat, const int ifunc) {
    void (*fp[4])(const int *uIntegers, double *uDoubles, const int n);

    int *a = (int*)_mm_malloc(sizeof(int)* n, 32);
    double *b1_cmp = (double*)_mm_malloc(sizeof(double)*n, 32);
    double *b1 = (double*)_mm_malloc(sizeof(double)*n, 32);
    double dtime;

    const char *fp_str[] = {
        "covert",
        "convert_vec4",
        "convert_vec4_unroll2",
        "convert_vec4_unroll2_openmp",
    };

    for(int i=0; i<n; i++) {
        a[i] = rand()*RAND_MAX;
    }

    fp[0] = convert;
    fp[1] = convert_vec4;
    fp[2] = convert_vec4_unroll2;
    fp[3] = convert_vec4_unroll2_openmp;

    convert(a, b1_cmp, n);

    dtime = omp_get_wtime();
    for(int i=0; i<repeat; i++) {
        fp[ifunc](a, b1, n);
    }
    dtime = omp_get_wtime() - dtime;
    printf("\t%s time %.2f, error %f\n", fp_str[ifunc], dtime, compare(b1_cmp,b1,n));

    _mm_free(a);
    _mm_free(b1_cmp);
    _mm_free(b1);
}

int main() {
    double dtime;
    int l1 = (32*1024)/(sizeof(int) + sizeof(double));
    int l2 = (256*1024)/(sizeof(int) + sizeof(double));
    int l3 = (8*1024*1024)/(sizeof(int) + sizeof(double));
    int lx = (100*1024*1024)/(sizeof(int) + sizeof(double));
    int n[] = {l1, l2, l3, lx};

    int repeat[] = {300000, 30000, 1000, 100};
    const char *cache_str[] = {"l1", "l2", "l3", "main memory"};
    for(int c=0; c<4; c++ ) {
        int lda = ((n[c]+7) & -8); //make sure array is a multiple of 8
        printf("%s chache, n %d\n", cache_str[c], lda);
        for(int i=0; i<4; i++) {
            loop(lda, repeat[c], i);
        } printf("\n");
    }
}

Enfin, si vous avez lu jusqu'ici et avez envie de me rappeler que mon code ressemble plus à du C qu'à du C++, lisez d'abord ce qui suit avant de décider de commenter http://www.stroustrup.com/sibling_rivalry.pdf

1voto

Lee Daniel Crocker Points 4812

Vous pourriez aussi essayer :

uDoubles[i] = ldexp((double)uIntegers[i], -15);

1voto

FrankH. Points 8850

Editar: Voir la réponse d'Adam ci-dessus pour une version utilisant les intrinsèques SSE. C'est mieux que ce que j'avais ici ...

Pour rendre cela plus utile, regardons ici le code généré par le compilateur. J'utilise gcc 4.8.0 et oui, il il est utile de vérifier votre compilateur spécifique (version) car il y a des différences assez significatives dans les résultats pour, disons, gcc 4.4, 4.8, clang 3.2 ou l'icc d'Intel.

Votre original, utilisant g++ -O8 -msse4.2 ... se traduit par la boucle suivante :

.L2:
    cvtsi2sd        (%rcx,%rax,4), %xmm0
    mulsd   %xmm1, %xmm0
    addl    $1, %edx
    movsd   %xmm0, (%rsi,%rax,8)
    movslq  %edx, %rax
    cmpq    %rdi, %rax
    jbe     .L2

donde %xmm1 tient 1.0/32768.0 donc le compilateur transforme automatiquement la division en multiplication par l'inverse.

D'autre part, en utilisant g++ -msse4.2 -O8 -funroll-loops ... le code créé pour la boucle change de manière significative :

[ ... ]
    leaq    -1(%rax), %rdi
    movq    %rdi, %r8
    andl    $7, %r8d
    je      .L3
[ ... insert a duff's device here, up to 6 * 2 conversions ... ]
    jmp     .L3
    .p2align 4,,10
    .p2align 3
.L39:
    leaq    2(%rsi), %r11
    cvtsi2sd        (%rdx,%r10,4), %xmm9
    mulsd   %xmm0, %xmm9
    leaq    5(%rsi), %r9
    leaq    3(%rsi), %rax
    leaq    4(%rsi), %r8
    cvtsi2sd        (%rdx,%r11,4), %xmm10
    mulsd   %xmm0, %xmm10
    cvtsi2sd        (%rdx,%rax,4), %xmm11
    cvtsi2sd        (%rdx,%r8,4), %xmm12
    cvtsi2sd        (%rdx,%r9,4), %xmm13
    movsd   %xmm9, (%rcx,%r10,8)
    leaq    6(%rsi), %r10
    mulsd   %xmm0, %xmm11
    mulsd   %xmm0, %xmm12
    movsd   %xmm10, (%rcx,%r11,8)
    leaq    7(%rsi), %r11
    mulsd   %xmm0, %xmm13
    cvtsi2sd        (%rdx,%r10,4), %xmm14
    mulsd   %xmm0, %xmm14
    cvtsi2sd        (%rdx,%r11,4), %xmm15
    mulsd   %xmm0, %xmm15
    movsd   %xmm11, (%rcx,%rax,8)
    movsd   %xmm12, (%rcx,%r8,8)
    movsd   %xmm13, (%rcx,%r9,8)
    leaq    8(%rsi), %r9
    movsd   %xmm14, (%rcx,%r10,8)
    movsd   %xmm15, (%rcx,%r11,8)
    movq    %r9, %rsi
.L3:
    cvtsi2sd        (%rdx,%r9,4), %xmm8
    mulsd   %xmm0, %xmm8
    leaq    1(%rsi), %r10
    cmpq    %rdi, %r10
    movsd   %xmm8, (%rcx,%r9,8)
    jbe     .L39
[ ... out ... ]

Il bloque donc les opérations vers le haut, mais convertit toujours une valeur à la fois.

Si vous modifiez votre boucle originale pour opérer sur quelques éléments par itération :

size_t i;
for (i = 0; i < uIntegers.size() - 3; i += 4)
{
    uDoubles[i] = uIntegers[i] / 32768.0;
    uDoubles[i+1] = uIntegers[i+1] / 32768.0;
    uDoubles[i+2] = uIntegers[i+2] / 32768.0;
    uDoubles[i+3] = uIntegers[i+3] / 32768.0;
}
for (; i < uIntegers.size(); i++)
    uDoubles[i] = uIntegers[i] / 32768.0;

le compilateur, gcc -msse4.2 -O8 ... (c'est-à-dire même sans dérouler les requêtes), identifie le potentiel d'utilisation de CVTDQ2PD / MULPD et le cœur de la boucle devient :

    .p2align 4,,10
    .p2align 3
.L4:
    movdqu  (%rcx), %xmm0
    addq    $16, %rcx
    cvtdq2pd        %xmm0, %xmm1
    pshufd  $238, %xmm0, %xmm0
    mulpd   %xmm2, %xmm1
    cvtdq2pd        %xmm0, %xmm0
    mulpd   %xmm2, %xmm0
    movlpd  %xmm1, (%rdx,%rax,8)
    movhpd  %xmm1, 8(%rdx,%rax,8)
    movlpd  %xmm0, 16(%rdx,%rax,8)
    movhpd  %xmm0, 24(%rdx,%rax,8)
    addq    $4, %rax
    cmpq    %r8, %rax
    jb      .L4
    cmpq    %rdi, %rax
    jae     .L29
[ ... duff's device style for the "tail" ... ]
.L29:
    rep ret

C'est à dire que maintenant le compilateur reconnaît l'opportunité de mettre deux double par registre SSE, et effectuer des multiplications / conversions parallèles. Ceci est assez proche du code que la version SSE intrinsèque d'Adam générerait.

Le code dans son ensemble (je n'en ai montré qu'environ 1/6e) est beaucoup plus complexe que les intrinsèques "directs", en raison du fait que, comme mentionné, le compilateur essaie de prépendre/appliquer des "têtes" et des "queues" non alignées/non-bloquées multiples à la boucle. Cela dépend largement des tailles moyennes/attendues de vos vecteurs si cela sera bénéfique ou non ; pour le cas "générique" (vecteurs de plus de deux fois la taille du bloc traité par la boucle "innermost"), cela aidera.

Le résultat de cet exercice est, en grande partie ... que, si vous contraignez (par les options/optimisation du compilateur) ou incitez (en réarrangeant légèrement le code) votre compilateur à faire la bonne chose, alors pour ce type spécifique de boucle de copie/conversion, il produit un code qui ne sera pas très en retard sur les intrinsèques écrits à la main.

Expérience finale ... faire le code :

static double c(int x) { return x / 32768.0; }
void Convert(const std::vector<int>& uIntegers, std::vector<double>& uDoubles)
{
    std::transform(uIntegers.begin(), uIntegers.end(), uDoubles.begin(), c);
}

et (pour la sortie assembleur la plus agréable à lire, cette fois en utilisant gcc 4.4 avec gcc -O8 -msse4.2 ... ) la boucle de base de l'assemblage généré (encore une fois, il y a un bit pré/post) devient :

    .p2align 4,,10
    .p2align 3
.L8:
    movdqu  (%r9,%rax), %xmm0
    addq    $1, %rcx
    cvtdq2pd        %xmm0, %xmm1
    pshufd  $238, %xmm0, %xmm0
    mulpd   %xmm2, %xmm1
    cvtdq2pd        %xmm0, %xmm0
    mulpd   %xmm2, %xmm0
    movapd  %xmm1, (%rsi,%rax,2)
    movapd  %xmm0, 16(%rsi,%rax,2)
    addq    $16, %rax
    cmpq    %rcx, %rdi
    ja      .L8
    cmpq    %rbx, %rbp
    leaq    (%r11,%rbx,4), %r11
    leaq    (%rdx,%rbx,8), %rdx
    je      .L10
[ ... ]
.L10:
[ ... ]
    ret

Avec cela, qu'apprenons-nous ? Si vous voulez utiliser le C++, utiliser réellement C++ ;-)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X