2 votes

Comment trier efficacement et de manière incrémentale des vecteurs en Python?

Dans une boucle qui collecte des échantillons, j'ai besoin d'obtenir des statistiques sur leurs indices triés de temps en temps, pour lesquels argsort retourne exactement ce dont j'ai besoin. Cependant, chaque itération n'ajoute qu'un seul échantillon, et c'est un énorme gaspillage de ressources de continuer à passer tout le tableau d'échantillons à la fonction argsort, surtout que le tableau d'échantillons est très énorme. N'y a-t-il pas une technique efficace et incrémentielle équivalente à argsort?

Je crois qu'une fonction d'argsort incrémentielle et efficace peut être mise en œuvre en maintenant une liste ordonnée d'échantillons, qui peut être recherchée pour les bons indices d'insertion une fois qu'un nouvel échantillon arrive. Ces indices peuvent ensuite être utilisés à la fois pour maintenir l'ordre de la liste des échantillons et pour générer la sortie désirée similaire à argsort incrémentiel. Jusqu'à présent, j'ai utilisé la fonction searchsorted2d de @Divakar, avec de légères modifications pour obtenir les indices d'insertion, et j'ai construit une routine qui peut obtenir la sortie désirée si elle est appelée après chaque insertion d'échantillon ( b = 1). Pourtant, cela n'est pas efficace, et j'aimerais appeler la routine après la collecte de k-ième échantillons (par exemple b = 10). Dans le cas d'insertions en masse, searchsorted2d semble renvoyer des indices incorrects, et c'est là que je me suis arrêté!

import time
import numpy as np

# Par Divakar
# Voir https://stackoverflow.com/a/40588862
def searchsorted2d(a, b):
    m, n = a.shape
    max_num = np.maximum(a.max() - a.min(), b.max() - b.min()) + 1
    r = max_num * np.arange(m)[:,np.newaxis]
    p = np.searchsorted((a + r).ravel(), (b + r).ravel()).reshape(b.shape)
    return p #- n * (np.arange(m)[:,np.newaxis])

# Ce qui suit fonctionne avec une taille de lot b = 1,
# mais ce n'est pas efficace ...
# Pouvons-nous le faire fonctionner pour n'importe quelle valeur b > 0?
class incremental(object):
    def __init__(self, shape):
        # Exprimer chaque décalage de ligne
        self.ranks_offset = np.tile(np.arange(shape[1]).reshape(1, -1),
                                    (shape[0], 1))
        # Stockage des échantillons triés
        self.a_sorted = np.empty((shape[0], 0))
        # Stockage des indices de tri
        self.a_ranks = np.empty((shape[0], 0), np.int)

    def argsort(self, a):
        if self.a_sorted.shape[1] == 0: # Utiliser np.argsort pour l'initialisation
            self.a_ranks = a.argsort(axis=1)
            self.a_sorted = np.take_along_axis(a, self.a_ranks, 1)
        else: # Dans les itérations ultérieures,
            # searchsorted de l'incrément d'entrée
            indices = searchsorted2d(self.a_sorted, a)
            # insérer la position de la pile pour suivre les indices de tri
            self.a_ranks = np.insert(self.a_ranks, indices.ravel(),
                                     self.ranks_offset.ravel() +
                                     self.a_ranks.shape[1]).reshape((n, -1))
            # insérer les incréments pour maintenir un tableau d'entrée trié
            self.a_sorted = np.insert(self.a_sorted, indices.ravel(),
                                      a.ravel()).reshape((n, -1))
        return self.a_ranks

M = 1000 # nombre d'itérations
n = 16   # taille du vecteur
b = 10   # taille du lot de vecteurs

# Stockage des échantillons
samples = np.zeros((n, M)) * np.nan

# L'approche proposée
inc = incremental((n, b))

c = 0 # compteur d'itérations
tick = time.time()
while c < M:
    if c % b == 0: # Effectuer des calculs en lot
        #sample_ranks = samples[:,:c].argsort(axis=1)
        sample_ranks = inc.argsort(samples[:,max(0,c-b):c]) # Argsort incrémentiel

        ######################################################
        # Utiliser sample_ranks dans des statistiques magiques ici #
        ######################################################

    samples[:,c] = np.random.rand(n) # collecter un échantillon
    c += 1 # incrémenter le compteur
tock = time.time()

dernier = ((c-1) // b) * b
sample_ranks_GT = samples[:,:last].argsort(axis=1) # Vérité terrain des rangs
print('Compatibilité: {0:.1f}%'.format(
      100 * np.count_nonzero(sample_ranks == sample_ranks_GT) / sample_ranks.size))
print('Temps écoulé: {0:.1f}ms'.format(
      (tock - tick) * 1000))

Je m'attends à une compatibilité de 100% avec la fonction argsort, mais il faut être plus efficace que d'appeler argsort. En ce qui concerne le temps d'exécution avec une approche incrémentielle, il semble que 15 ms environ devraient être suffisants pour l'exemple donné. Jusqu'à présent, une seule condition parmi ces deux peut être respectée avec l'une des techniques explorées.


Pour faire court, l'algorithme présenté ci-dessus semble être une variante d'un arbre d'ordre statistique pour estimer les rangs des données, mais il échoue à le faire lorsque des échantillons sont ajoutés en masse (b > 1). Jusqu'à présent, il fonctionne uniquement lorsque des échantillons sont insérés un par un (b = 1). Cependant, les tableaux sont copiés à chaque fois que insert est appelé, ce qui entraîne une énorme surcharge et forme un goulot d'étranglement, donc les échantillons doivent être ajoutés en blocs plutôt qu'individuellement.

Pouvez-vous introduire un algorithme d'argsort incrémentiel plus efficace, ou du moins comprendre comment prendre en charge l'insertion en blocs (b > 1) dans celui-ci?

Si vous choisissez de partir de là où je me suis arrêté, alors le problème peut être réduit à corriger le bogue dans la capture d'écran suivante:

import numpy as np

# Par Divakar
# Voir https://stackoverflow.com/a/40588862
def searchsorted2d(a, b):
    m, n = a.shape
    max_num = np.maximum(a.max() - a.min(), b.max() - b.min()) + 1
    r = max_num * np.arange(m)[:,np.newaxis]
    p = np.searchsorted((a + r).ravel(), (b + r).ravel()).reshape(b.shape)
    # Il semble que le bogue soit ici...
    #return p - b.shape[0] * np.arange(b.shape[1])[np.newaxis]
    #return p - b.shape[1] * np.arange(b.shape[0])[:,np.newaxis]
    return p

n = 16  # taille du vecteur
b = 2   # taille du lot de vecteurs

a = np.random.rand(n, 1) # Tableau d'échantillons
a_ranks = a.argsort(axis=1) # Rangs initiaux
a_sorted = np.take_along_axis(a, a_ranks, 1) # Tableau trié initial

new_data = np.random.rand(n, b) # Nouveau bloc à ajouter dans le tableau d'échantillons
a = np.hstack((a, new_data)) #Ajouter un nouveau bloc

indices = searchsorted2d(a_sorted, new_data) # Calculer les indices d'insertion
ranks_offset = np.tile(np.arange(b).reshape(1, -1), (a_ranks.shape[0], 1)) + a_ranks.shape[1] # Rangs à insérer
a_ranks = np.insert(a_ranks, indices.ravel(), ranks_offset.ravel()).reshape((n, -1)) # Insérer des rangs en fonction de leurs indices
a_ransk_GT = a.argsort(axis=1) # Rangs vérité terrain

masque = (a_ranks == a_ransk_GT)
print(masque) # Pourquoi ne sont-ils pas tous vrais?
assert(np.all(masque)), "Oups!" #Cela ne devrait pas échouer, mais c'est le cas :(

Il semble que l'insertion en bloc soit plus complexe que ce que je pensais initialement, et que searchsorted2d n'est pas à blâmer. Prenez le cas d'un tableau trié a = [ 1, 2, 5 ], et deux nouveaux éléments block = [3, 4] à insérer. Si on itère et on insère, alors np.searchsorted(a, block[i]) renverrait [2] et [3], et c'est juste correct. Cependant, si nous appelons np.searchsorted(a, block) (le comportement souhaité - équivalent à l'itération sans insertion), nous obtiendrions [2, 2]. Cela pose problème pour implémenter un argsort incrémentiel, car même np.searchsorted(a, block[::-1]) donnerait le même résultat. Avez-vous une idée?

0voto

Hamdi Points 41

Il s'est avéré que les indices retournés par searchsorted ne sont pas suffisants pour garantir un tableau trié lors du traitement d'entrées par lots. Si le bloc à insérer contient deux entrées qui ne sont pas dans l'ordre, mais qui finiront par être placées côte à côte dans le tableau cible, alors ils recevront le même indice d'insertion exact, ce qui les fera être insérés dans leur ordre actuel, causant la faille. Par conséquent, le bloc d'entrée lui-même doit être trié avant l'insertion. Voir le dernier paragraphe de la question pour un exemple numérique.

En triant le bloc d'entrée et en adaptant les parties restantes, une solution 100,0% compatible avec argsort est obtenue, et elle est très efficace (le temps écoulé est de 15,6 ms pour l'insertion de 1000 entrées par blocs de dix b = 10). Cela peut être reproduit en remplaçant la classe défectueuse incremental trouvée dans la question par celle-ci :

# par Hamdi Sahloul
class incremental(object):
    def __init__(self, shape):
        # Stockage des échantillons triés
        self.a_sorted = np.empty((shape[0], 0))
        # Stockage des indices de tri
        self.a_ranks = np.empty((shape[0], 0), np.int)

    def argsort(self, block):
        # Calcul des rangs du bloc d'entrée
        block_ranks = block.argsort(axis=1)
        # Trier le bloc en conséquence
        block_sorted = np.take_along_axis(block, block_ranks, 1)
        if self.a_sorted.shape[1] == 0: # Initialisation en utilisant les données du bloc
            self.a_ranks = block_ranks
            self.a_sorted = block_sorted
        else: # Dans les itérations ultérieures,
            # searchsorted du bloc d'entrée
            indices = searchsorted2d(self.a_sorted, block_sorted)
            # mettre à jour les rangs globaux
            self.a_ranks = np.insert(self.a_ranks, indices.ravel(),
                                     block_ranks.ravel() +
                                     self.a_ranks.shape[1]).reshape((block.shape[0], -1))
            # mettre à jour le tableau trié global
            self.a_sorted = np.insert(self.a_sorted, indices.ravel(),
                                      block_sorted.ravel()).reshape((block.shape[0], -1))
        return self.a_ranks

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X