214 votes

Moyen le plus rapide de trier 10 numéros ? (les numéros sont 32 bits)

Je suis à la résolution d'un problème et qu'elle implique le tri 10 numéros (int32) très rapidement. Mon application doit trier les 10 numéros de millions de fois plus vite que possible. Je suis d'échantillonnage d'un ensemble de données de milliards d'éléments, et chaque fois que je dois choisir 10 numéros (simplifié) et le tri (et de tirer des conclusions à partir de la triés 10 élément de la liste).

Actuellement, je suis en utilisant le tri par insertion, mais j'imagine que je pourrais mettre en œuvre très rapide, personnalisé algorithme de tri pour mon problème spécifique de 10 numéros, qui a battu le tri par insertion.

Quelqu'un a une idée sur la façon d'aborder ce problème?

215voto

m69 Points 987

(Suite à la suggestion de HelloWorld à regarder dans le tri des réseaux.)

Il semble qu'un 29-comparaison/swap réseau est le moyen le plus rapide pour faire 10 d'entrée de tri. J'ai utilisé le réseau découvert par Waksman en 1969 pour cet exemple en Javascript, ce qui devrait se traduire directement en C, comme il est juste une liste d' if des déclarations.

function sortNet10(data) {	// ten-input sorting network by Waksman, 1969
    var swap;
    if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
    if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
    if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
    if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
    if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
    if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
    if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
    if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
    if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
    if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
    if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
    if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
    if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
    if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
    if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
    if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
    if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
    if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
    if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
    if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
    if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
    if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
    if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
    if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
    if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
    if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
    if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
    if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
    if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
    return(data);
}

alert(sortNet10([5,7,1,8,4,3,6,9,2,0]));

(Mise à JOUR)

Voici une représentation graphique du réseau, divisée en indépendant phases.
10-input sorting network (Waksman, 1969)
Pour profiter du traitement parallèle, la 5-4-3-4-4-4-3-2 groupement peut être changé en un 4-4-4-4-4-4-3-2 groupement.
10-input sorting network (Waksman, 1969) re-grouped

89voto

HelloWorld Points 645

Lorsque vous faites affaire avec cette taille fixe essayer réseaux de tri. Gardez à l’esprit cet algorithme fonctionne mieux avec len(n) https://en.wikipedia.org/wiki/Sorting_network

Une bonne page pour comparer les algorithmes de tri est-ce un ici (même si sa manque le réseau tri.

http://www.Sorting-Algorithms.com

34voto

Peter Cordes Points 1375

L'utilisation d'un tri réseau qui a des comparaisons par groupes de 4, de sorte que vous pouvez le faire en SIMD registres. Une paire de paniers min/max des instructions met en œuvre des paniers-comparateur de fonction. Désolé je n'ai pas le temps maintenant de chercher une page je me souviens avoir vu à ce sujet, mais j'espère que la recherche sur SIMD ou ESS tri des réseaux de tourner quelque chose.

x86 de l'ESS n'ont paniers-32 bits d'un entier min et max des instructions pour les vecteurs de quatre entiers 32 bits. AVX2 (Haswell et plus tard) ont la même mais pour 256b vecteurs de 8 ints. Il y a également efficace shuffle instructions.

Si vous avez beaucoup de indépendants de petites sortes, il pourrait être possible de faire 4 ou 8 sortes en parallèle à l'aide de vecteurs. Esp. si vous êtes de choisir des éléments au hasard (de sorte que les données soient triées de ne pas être contigus en mémoire de toute façon), vous pouvez éviter de mélange et de comparer simplement dans l'ordre que vous avez besoin. 10 registres pour contenir toutes les données à partir de 4 (AVX2: 8) la liste des 10 ints laisse encore 6 regs pour l'espace de travail.

Vecteur de tri des réseaux sont de moins en moins efficace si vous aussi vous avez besoin de trier les données associées. Dans ce cas, le moyen le plus efficace semble être d'utiliser un pique-comparer pour obtenir un masque de quels éléments a changé, et utiliser un masque de fusion des vecteurs de (références) de données associées.

26voto

DarioP Points 1350

Ce sujet d'une déroulés, d'une branche moins de tri de la sélection?

#include <iostream>
#include <algorithm>
#include <random>

//return the index of the minimum element in array a
int min(const int * const a) {
  int m = a[0];
  int indx = 0;
  #define TEST(i) (m > a[i]) && (m = a[i], indx = i ); 
  //see http://stackoverflow.com/a/7074042/2140449
  TEST(1);
  TEST(2);
  TEST(3);
  TEST(4);
  TEST(5);
  TEST(6);
  TEST(7);
  TEST(8);
  TEST(9);
  #undef TEST
  return indx;
}

void sort( int * const a ){
  int work[10];
  int indx;
  #define GET(i) indx = min(a); work[i] = a[indx]; a[indx] = 2147483647; 
  //get the minimum, copy it to work and set it at max_int in a
  GET(0);
  GET(1);
  GET(2);
  GET(3);
  GET(4);
  GET(5);
  GET(6);
  GET(7);
  GET(8);
  GET(9);
  #undef GET
  #define COPY(i) a[i] = work[i];
  //copy back to a
  COPY(0);
  COPY(1);
  COPY(2);
  COPY(3);
  COPY(4);
  COPY(5);
  COPY(6);
  COPY(7);
  COPY(8);
  COPY(9);
  #undef COPY
}

int main() {
  //generating and printing a random array
  int a[10] = { 1,2,3,4,5,6,7,8,9,10 };
  std::random_device rd;
  std::mt19937 g(rd());
  std::shuffle( a, a+10, g);
  for (int i = 0; i < 10; i++) {
    std::cout << a[i] << ' ';
  }
  std::cout << std::endl;

  //sorting and printing again
  sort(a);
  for (int i = 0; i < 10; i++) {
    std::cout << a[i] << ' ';
  } 

  return 0;
}

http://coliru.stacked-crooked.com/a/71e18bc4f7fa18c6

Les seules lignes sont les deux premiers #define.

Il utilise deux listes et entièrement vérifiez à nouveau la première pour dix fois ce qui serait un mal mis en œuvre de tri de la sélection, mais il évite de branches et de longueur variable des boucles, ce qui peut compenser avec des processeurs modernes et un petit ensemble de données.


Référence

Je la compare à la tri réseau, et mon code semble être plus lent. Cependant j'ai essayé d'enlever le déroulage et la copie. L'exécution de ce code:

#include <iostream>
#include <algorithm>
#include <random>
#include <chrono>

int min(const int * const a, int i) {
  int m = a[i];
  int indx = i++;
  for ( ; i<10; i++) 
    //see http://stackoverflow.com/a/7074042/2140449
    (m > a[i]) && (m = a[i], indx = i ); 
  return indx;
}

void sort( int * const a ){
  for (int i = 0; i<9; i++)
    std::swap(a[i], a[min(a,i)]); //search only forward
}


void sortNet10(int * const data) {  // ten-input sorting network by Waksman, 1969
    int swap;
    if (data[0] > data[5]) { swap = data[0]; data[0] = data[5]; data[5] = swap; }
    if (data[1] > data[6]) { swap = data[1]; data[1] = data[6]; data[6] = swap; }
    if (data[2] > data[7]) { swap = data[2]; data[2] = data[7]; data[7] = swap; }
    if (data[3] > data[8]) { swap = data[3]; data[3] = data[8]; data[8] = swap; }
    if (data[4] > data[9]) { swap = data[4]; data[4] = data[9]; data[9] = swap; }
    if (data[0] > data[3]) { swap = data[0]; data[0] = data[3]; data[3] = swap; }
    if (data[5] > data[8]) { swap = data[5]; data[5] = data[8]; data[8] = swap; }
    if (data[1] > data[4]) { swap = data[1]; data[1] = data[4]; data[4] = swap; }
    if (data[6] > data[9]) { swap = data[6]; data[6] = data[9]; data[9] = swap; }
    if (data[0] > data[2]) { swap = data[0]; data[0] = data[2]; data[2] = swap; }
    if (data[3] > data[6]) { swap = data[3]; data[3] = data[6]; data[6] = swap; }
    if (data[7] > data[9]) { swap = data[7]; data[7] = data[9]; data[9] = swap; }
    if (data[0] > data[1]) { swap = data[0]; data[0] = data[1]; data[1] = swap; }
    if (data[2] > data[4]) { swap = data[2]; data[2] = data[4]; data[4] = swap; }
    if (data[5] > data[7]) { swap = data[5]; data[5] = data[7]; data[7] = swap; }
    if (data[8] > data[9]) { swap = data[8]; data[8] = data[9]; data[9] = swap; }
    if (data[1] > data[2]) { swap = data[1]; data[1] = data[2]; data[2] = swap; }
    if (data[3] > data[5]) { swap = data[3]; data[3] = data[5]; data[5] = swap; }
    if (data[4] > data[6]) { swap = data[4]; data[4] = data[6]; data[6] = swap; }
    if (data[7] > data[8]) { swap = data[7]; data[7] = data[8]; data[8] = swap; }
    if (data[1] > data[3]) { swap = data[1]; data[1] = data[3]; data[3] = swap; }
    if (data[4] > data[7]) { swap = data[4]; data[4] = data[7]; data[7] = swap; }
    if (data[2] > data[5]) { swap = data[2]; data[2] = data[5]; data[5] = swap; }
    if (data[6] > data[8]) { swap = data[6]; data[6] = data[8]; data[8] = swap; }
    if (data[2] > data[3]) { swap = data[2]; data[2] = data[3]; data[3] = swap; }
    if (data[4] > data[5]) { swap = data[4]; data[4] = data[5]; data[5] = swap; }
    if (data[6] > data[7]) { swap = data[6]; data[6] = data[7]; data[7] = swap; }
    if (data[3] > data[4]) { swap = data[3]; data[3] = data[4]; data[4] = swap; }
    if (data[5] > data[6]) { swap = data[5]; data[5] = data[6]; data[6] = swap; }
}


std::chrono::duration<double> benchmark( void(*func)(int * const), const int seed ) {
  std::mt19937 g(seed);
  int a[10] = {10,11,12,13,14,15,16,17,18,19};
  std::chrono::high_resolution_clock::time_point t1, t2; 
  t1 = std::chrono::high_resolution_clock::now();
  for (long i = 0; i < 1e7; i++) {
    std::shuffle( a, a+10, g);
    func(a);
  }
  t2 = std::chrono::high_resolution_clock::now();
  return std::chrono::duration_cast<std::chrono::duration<double>>(t2 - t1);
}

int main() {
  std::random_device rd;
  for (int i = 0; i < 10; i++) {
    const int seed = rd();
    std::cout << "seed = " << seed << std::endl;
    std::cout << "sortNet10: " << benchmark(sortNet10, seed).count() << std::endl;
    std::cout << "sort:      " << benchmark(sort,      seed).count() << std::endl;
  }
  return 0;
}

Je suis toujours un meilleur résultat, pour la direction,-moins de tri de la sélection par rapport à la tri réseau.

$ gcc -v
gcc version 5.2.0 (GCC) 
$ g++ -std=c++11 -Ofast sort.cpp && ./a.out
seed = -1727396418
sortNet10: 2.24137
sort:      2.21828
seed = 2003959850
sortNet10: 2.23914
sort:      2.21641
seed = 1994540383
sortNet10: 2.23782
sort:      2.21778
seed = 1258259982
sortNet10: 2.25199
sort:      2.21801
seed = 1821086932
sortNet10: 2.25535
sort:      2.2173
seed = 412262735
sortNet10: 2.24489
sort:      2.21776
seed = 1059795817
sortNet10: 2.29226
sort:      2.21777
seed = -188551272
sortNet10: 2.23803
sort:      2.22996
seed = 1043757247
sortNet10: 2.2503
sort:      2.23604
seed = -268332483
sortNet10: 2.24455
sort:      2.24304

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X