2 votes

Quelle est la meilleure stratégie de mise en œuvre ?

Cette question porte sur la meilleure stratégie à adopter pour mettre en œuvre la simulation suivante en C++.

J'essaie de faire une simulation dans le cadre d'un projet de recherche en physique, qui suit la dynamique d'une chaîne de nœuds dans l'espace. Chaque noeud contient une position ainsi que certains paramètres (courbure locale, vitesse, distance aux voisins etc ) qui évoluent tous dans le temps.

Chaque pas de temps peut être décomposé en quatre parties :

  • Calculer les paramètres locaux. Les valeurs dépendent des voisins les plus proches dans la chaîne.
  • Calculer les paramètres globaux.
  • Évolution. La position de chaque nœud est légèrement déplacée, en fonction de paramètres globaux et locaux, et de champs de force aléatoires.
  • Rembourrage. De nouveaux nœuds sont insérés si la distance entre deux nœuds consécutifs atteint une valeur critique.

(En outre, les nœuds peuvent être bloqués, ce qui les rend inactifs pour le reste de la simulation. Les paramètres locaux des nœuds inactifs ayant des voisins inactifs ne changent pas et ne nécessitent pas de calcul supplémentaire).

Chaque noeud contient ~ 60 octets, j'ai ~ 100 000 noeuds dans la chaîne, et je dois faire évoluer la chaîne d'environ ~ 1 000 000 pas de temps. J'aimerais cependant maximiser ces nombres, car cela augmenterait la précision de ma simulation, mais à condition que la simulation soit effectuée en un temps raisonnable (~heures). (~30 % des noeuds seront inactifs).

J'ai commencé à mettre en œuvre cette simulation sous la forme d'une liste doublement liée en C++. Cela semble naturel, car j'ai besoin d'insérer de nouveaux nœuds entre les nœuds existants, et parce que les paramètres locaux dépendent des voisins les plus proches. (J'ai ajouté un pointeur supplémentaire vers le prochain nœud actif, afin d'éviter tout calcul inutile lorsque je boucle sur l'ensemble de la chaîne).

Je ne suis pas un expert en matière de parallélisation (ou de codage d'ailleurs), mais j'ai joué avec OpenMP, et j'aime beaucoup la façon dont je peux accélérer des boucles d'opérations indépendantes avec deux lignes de code. Je ne sais pas comment faire en sorte que ma liste chaînée fasse des choses en parallèle, ni même si cela fonctionne ( ?). J'ai donc eu l'idée de travailler avec un vecteur stl. Au lieu d'avoir des pointeurs vers les voisins les plus proches, je pourrais stocker les indices des voisins et y accéder par une recherche standard. Je pourrais aussi trier le vecteur par la position de la chaîne (tous les xième pas de temps) pour obtenir une meilleure localité en mémoire. Cette approche permettrait de faire des boucles à la manière d'OpenMP.

Je suis un peu intimidé par l'idée, car je n'ai pas à m'occuper de la gestion de la mémoire. Et je suppose que l'implémentation du vecteur stl est bien meilleure que ma simple façon de gérer les nœuds de la liste avec les commandes 'new' et 'delete'. Je sais que j'aurais pu faire la même chose avec des listes stl, mais je n'aime pas la façon dont je dois accéder aux plus proches voisins avec des itérateurs.

Je vous demande donc, 1337 h4x0r et programmeurs compétents, quelle serait une meilleure conception pour ma simulation ? L'approche vectorielle esquissée ci-dessus est-elle une bonne idée ? Ou y a-t-il des astuces à jouer sur les listes chaînées pour les faire fonctionner avec OpenMP ? Ou dois-je envisager une approche totalement différente ?

La simulation sera exécutée sur un ordinateur doté de 8 cœurs et d'une mémoire vive de 48 Go, je suppose donc que je peux échanger beaucoup de mémoire contre de la vitesse.

Merci d'avance

Editer : J'ai besoin d'ajouter 1 à 2 % de nouveaux nœuds à chaque pas de temps, de sorte que leur stockage sous forme de vecteur sans indices des voisins les plus proches ne fonctionnera pas, à moins que je ne trie le vecteur à chaque pas de temps.

2voto

Jonathan Dursi Points 25143

Il s'agit d'une question classique de compromis. L'utilisation d'un tableau ou d'un std::vector rendra les calculs plus rapides et les insertions plus lentes ; l'utilisation d'une liste doublement liée ou d'un std::list rendra les insertions plus rapides et les calculs plus lents.

La seule façon d'évaluer les questions de compromis est empirique : quelle est la solution la plus rapide pour votre application particulière ? Tout ce que vous pouvez faire, c'est essayer les deux solutions et voir. Plus le calcul est intense et plus le stencil est court (par exemple, l'intensité du calcul - combien de flops vous devez effectuer par quantité de mémoire que vous devez apporter), moins un tableau standard sera important. Mais en fait, vous devriez simuler une implémentation de votre calcul de base dans les deux sens et voir si cela a de l'importance. J'ai bricolé un très Le système std::vector et std::list est probablement erroné à bien des égards, mais vous pouvez l'essayer et jouer avec certains paramètres pour voir ce qui vous convient le mieux. Sur mon système, pour les tailles et la quantité de calcul données, list est plus rapide, mais il est possible de faire l'un ou l'autre assez facilement.

En ce qui concerne openmp, oui, si c'est la voie que vous allez suivre, vous avez les mains quelque peu liées ; vous devrez presque certainement opter pour la structure vectorielle, mais vous devez d'abord vous assurer que le coût supplémentaire des insertions ne réduira pas à néant les avantages des cœurs multiples.

#include <iostream>
#include <list>
#include <vector>
#include <cmath>
#include <sys/time.h>
using namespace std;

struct node {
    bool stuck;
    double x[2];
    double loccurve;
    double disttoprev;
};

void tick(struct timeval *t) {
    gettimeofday(t, NULL);
}

/* returns time in seconds from now to time described by t */
double tock(struct timeval *t) {
    struct timeval now;
    gettimeofday(&now, NULL);
    return (double)(now.tv_sec - t->tv_sec) +
        ((double)(now.tv_usec - t->tv_usec)/1000000.);
}

int main()
{
    const int nstart = 100;
    const int niters = 100;
    const int nevery = 30;
    const bool doPrint = false;
    list<struct node>   nodelist;
    vector<struct node> nodevect;

    // Note - vector is *much* faster if you know ahead of time 
    //  maximum size of vector
    nodevect.reserve(nstart*30);

    // Initialize
    for (int i = 0; i < nstart; i++) {
        struct node *mynode = new struct node;
        mynode->stuck = false;
        mynode->x[0] = i; mynode->x[1] = 2.*i;
        mynode->loccurve = -1;
        mynode->disttoprev = -1;

        nodelist.push_back( *mynode );
        nodevect.push_back( *mynode );
    }

    const double EPSILON = 1.e-6;
    struct timeval listclock;
    double listtime;

    tick(&listclock);
    for (int i=0; i<niters; i++) {
        // Calculate local curvature, distance

        list<struct node>::iterator prev, next, cur;
        double dx1, dx2, dy1, dy2;

        next = cur = prev = nodelist.begin();
        cur++;
        next++; next++;
        dx1 = prev->x[0]-cur->x[0];
        dy1 = prev->x[1]-cur->x[1];

        while (next != nodelist.end()) {
            dx2 = cur->x[0]-next->x[0];
            dy2 = cur->x[1]-next->x[1];

            double slope1 = (dy1/(dx1+EPSILON));
            double slope2 = (dy2/(dx2+EPSILON));

            cur->disttoprev = sqrt(dx1*dx1 + dx2*dx2 );

            cur->loccurve = ( slope1*slope2*(dy1+dy2) +
                              slope2*(prev->x[0]+cur->x[0]) -
                              slope1*(cur->x[0] +next->x[0]) ) /
                            (2.*(slope2-slope1) + EPSILON);

            next++;
            cur++;
            prev++;
        }

        // Insert interpolated pt every neveryth pt
        int count = 1;
        next = cur = nodelist.begin();
        next++;
        while (next != nodelist.end()) {
            if (count % nevery == 0) {
                struct node *mynode = new struct node;
                mynode->x[0] = (cur->x[0]+next->x[0])/2.;
                mynode->x[1] = (cur->x[1]+next->x[1])/2.;
                mynode->stuck = false;
                mynode->loccurve = -1;
                mynode->disttoprev = -1;
                nodelist.insert(next,*mynode);
            }
            next++;
            cur++;
            count++;
        }
    }
                                                               51,0-1        40%

    struct timeval vectclock;
    double vecttime;

    tick(&vectclock);
    for (int i=0; i<niters; i++) {
        int nelem = nodevect.size();
        double dx1, dy1, dx2, dy2;
        dx1 = nodevect[0].x[0]-nodevect[1].x[0];
        dy1 = nodevect[0].x[1]-nodevect[1].x[1];

        for (int elem=1; elem<nelem-1; elem++) {
            dx2 = nodevect[elem].x[0]-nodevect[elem+1].x[0];
            dy2 = nodevect[elem].x[1]-nodevect[elem+1].x[1];

            double slope1 = (dy1/(dx1+EPSILON));
            double slope2 = (dy2/(dx2+EPSILON));

            nodevect[elem].disttoprev = sqrt(dx1*dx1 + dx2*dx2 );

            nodevect[elem].loccurve = ( slope1*slope2*(dy1+dy2) +
                              slope2*(nodevect[elem-1].x[0] +
                                      nodevect[elem].x[0])  -
                              slope1*(nodevect[elem].x[0] +
                                      nodevect[elem+1].x[0]) ) /
                            (2.*(slope2-slope1) + EPSILON);

        }

        // Insert interpolated pt every neveryth pt
        int count = 1;
        vector<struct node>::iterator next, cur;
        next = cur = nodevect.begin();
        next++;
        while (next != nodevect.end()) {
            if (count % nevery == 0) {
                struct node *mynode = new struct node;
                mynode->x[0] = (cur->x[0]+next->x[0])/2.;
                mynode->x[1] = (cur->x[1]+next->x[1])/2.;
                mynode->stuck = false;
                mynode->loccurve = -1;
                mynode->disttoprev = -1;
                nodevect.insert(next,*mynode);
            }
            next++;
            cur++;
            count++;
        }
    }
    vecttime = tock(&vectclock);

    cout << "Time for list: " << listtime << endl;
    cout << "Time for vect: " << vecttime << endl;

    vector<struct node>::iterator v;
    list  <struct node>::iterator l;

    if (doPrint) {
        cout << "Vector: " << endl;
        for (v=nodevect.begin(); v!=nodevect.end(); ++v) {
             cout << "[ (" << v->x[0] << "," << v->x[1] << "), " << v->disttoprev << ", " << v->loccurve << "] " << endl;
        }

        cout << endl << "List: " << endl;
        for (l=nodelist.begin(); l!=nodelist.end(); ++l) {
             cout << "[ (" << l->x[0] << "," << l->x[1] << "), " << l->disttoprev << ", " << l->loccurve << "] " << endl;
        }

    }

    cout << "List size is " << nodelist.size() << endl;
}

1voto

Oli Charlesworth Points 148744

En supposant que la création de nouveaux éléments soit relativement rare, j'adopterais l'approche des vecteurs triés, pour toutes les raisons que vous avez énumérées :

  • Pas de perte de temps à suivre les pointeurs/indices.
  • Tirer parti de la proximité spatiale
  • Beaucoup plus facile à mettre en parallèle

Bien entendu, pour que cela fonctionne, vous devez vous assurer que le vecteur est toujours trié, et non pas simplement à chaque k-ième pas de temps.

0voto

MSalters Points 74024

Cela semble être un bon exercice pour les étudiants en programmation parallèle.

Vous semblez avoir une structure de données qui mène naturellement à la distribution, une chaîne. Vous pouvez faire beaucoup de travail sur des sous-chaînes qui sont (semi)statiquement assignées à différents threads. Vous pourriez vouloir traiter les cas limites N-1 séparément, mais si les longueurs des sous-chaînes sont >3, elles sont isolées les unes des autres.

Bien sûr, entre chaque étape, vous devrez mettre à jour les variables globales, mais les variables telles que la longueur de la chaîne sont de simples ajouts parallèles. Il suffit de calculer la longueur de chaque sous-chaîne et de les additionner. Si vos sous-chaînes ont une longueur de 100000/8, le travail à un seul thread est l'addition de ces 8 longueurs de sous-chaînes entre les étapes.

Si la croissance des nœuds est très irrégulière, il peut être utile de rééquilibrer la longueur des sous-chaînes de temps en temps.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X