2 votes

Problème par dictionnaires pour utiliser la parallélisation numba njit pour accélérer le code

J'ai écrit un code et j'essaie d'utiliser numba pour accélérer le code. Le but principal du code est de regrouper certaines valeurs en fonction d'une condition. A cet égard, iter_ est utilisé pour faire converger le code afin de satisfaire la condition. J'ai préparé un petit cas ci-dessous pour reproduire l'exemple de code :

import numpy as np
import numba as nb

rng = np.random.default_rng(85)

# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
#                                        0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
#            'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
#                                        0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
#            'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
#                                        0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
#           'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
#           'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------

# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]

Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------

# values_ = [*values_.values()]
# params = [*params.values()]

# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):

final_dict = {}
for i, j in enumerate(values_.keys()):
    Rand_vals = []
    goal_sum = params[j][1] * params[j][3]
    tel = goal_sum / dec_ * 10
    if params[j][0] != 0:
        for k in range(Sno):
            final_sum = 0.0
            iter_ = 0
            t = 1
            while not np.allclose(goal_sum, final_sum, atol=tel):
                iter_ += 1
                vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
                # final_sum = 0.0016 * np.sum(vals_group)  # -----> For small data volume
                final_sum = np.sum(vals_group ** 3)        # -----> UPDATED For med or large data volume
                if iter_ == upd_:
                    t += 1
                    tel = t * tel
            values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
            Rand_vals.append(vals_group)
    else:
        Rand_vals = [np.array([])] * Sno
    final_dict["R" + str(i)] = Rand_vals

#    return final_dict

# test(values_, params, Sno, dec_, upd_)

Dans un premier temps, pour appliquer le numba sur ce code @nb.jit a été utilisé ( forceobj=True est utilisé pour éviter les avertissements et ), ce qui aura un effet négatif sur les performances. nopython est également vérifié, avec @nb.njit qui obtient l'erreur suivante en raison de ne pas soutenir (comme mentionné dans 1 , 2 ) type de dictionnaire des entrées :

ne peut pas déterminer le type de Numba de <classe 'dict'>

Je ne sais pas si (comment) cela pourrait être traité par Dict de numba.typed (en convertissant les dictionnaires python créés en numba Dict) ou si l'on convertit le fichier dictionnaires a listes de tableaux n'ont aucun avantage. Je pense que la parallélisation peut être possible si certaines lignes de code, par ex. Rand_vals.append(vals_group) o autre section ou être pris ou être modifié hors de la fonction pour obtenir les mêmes résultats que précédemment, mais je n'ai aucune idée de la façon de le faire.

Je vous serai reconnaissant d'aider à utiliser numba sur ce code. numba parallelization sera la solution la plus souhaitée (probablement la meilleure méthode applicable en termes de performances) si elle pouvait .


Données :

3voto

Jérôme Richard Points 7521

Ce code peut être converti en Numba mais ce n'est pas simple.

Tout d'abord, le type de dictionnaire et de liste doit être défini puisque Numba njit les fonctions ne peuvent pas opérer directement sur des listes réfléchies (aka. listes pure-python). C'est un peu fastidieux à faire dans Numba et le code résultant est un peu verbeux :

String = nb.types.unicode_type
ValueArray = nb.float64[::1]
ValueDict = nb.types.DictType(String, ValueArray)
ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
ParamDict = nb.types.DictType(String, ParamDictValue)
FinalDictValue = nb.types.ListType(ValueArray)
FinalDict = nb.types.DictType(String, FinalDictValue)

Ensuite, vous devez convertir les dictionnaires d'entrée :

nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
for key,value in values_.items():
    nbValues[key] = value.copy()

nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
for key,value in params.items():
    nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))

Ensuite, vous devez écrire la fonction principale. np.allclose y np.isin ne sont pas implémentés dans Numba, ils doivent donc être réimplémentés manuellement. Mais le point principal est que Numba ne supporte pas l'option rng Objet Numpy. Je pense qu'il ne le supportera certainement pas de sitôt. Notez que Numba a une implémentation de nombres aléatoires qui essaie d'imiter le comportement de Numpy mais la gestion de la graine est un peu différente. Notez également que les résultats devraient être les mêmes avec la fonction np.random.xxx Numpy fonctionne si la graine est fixée à la même valeur (Numpy et Numba ont des variables de graine différentes qui ne sont pas synchronisées).

@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
    final_dict = nb.typed.Dict.empty(String, FinalDictValue)
    for i, j in enumerate(values_.keys()):
        Rand_vals = nb.typed.List.empty_list(ValueArray)
        goal_sum = params[j][1] * params[j][3]
        tel = goal_sum / dec_ * 10
        if params[j][0] != 0:
            for k in range(Sno):
                final_sum = 0.0
                iter_ = 0
                t = 1

                vals_group = np.empty(0, dtype=nb.float64)

                while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
                    iter_ += 1
                    vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
                    final_sum = 0.0016 * np.sum(vals_group)
                    # final_sum = 0.0016 * np.sum(vals_group)  # (for small data volume)
                    final_sum = np.sum(vals_group ** 3)        # (for med or large data volume)
                    if iter_ == upd_:
                        t += 1
                        tel = t * tel

                # Perform an in-place deletion
                vals, gr = values_[j], vals_group
                cur = 0
                for l in range(vals.size):
                    found = False
                    for m in range(gr.size):
                        found |= vals[l] == gr[m]
                    if not found:
                        # Keep the value (delete it otherwise)
                        vals[cur] = vals[l]
                        cur += 1
                values_[j] = vals[:cur]

                Rand_vals.append(vals_group)
        else:
            for k in range(Sno):
                Rand_vals.append(np.empty(0, dtype=nb.float64))
        final_dict["R" + str(i)] = Rand_vals
    return final_dict

Notez que l'implémentation de remplacement de np.isin est assez naïf mais il fonctionne assez bien en pratique sur votre exemple d'entrée.

La fonction peut être appelée de la manière suivante :

nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)

Enfin, le dictionnaire doit être reconverti en objets Python de base :

finalDict = dict()
for key,value in nbFinalDict.items():
    finalDict[key] = list(value)

Cette implémentation est rapide pour les petites entrées mais pas pour les grandes puisque np.random.choice prend presque tout le temps (>96%). Le fait est que cette fonction est clairement non optimal lorsque le nombre d'articles demandés est faible (ce qui est votre cas). En effet, il s'exécute étonnamment en temps linéaire du tableau d'entrée et non en temps linéaire du nombre d'éléments demandés.


Autres optimisations

L'algorithme peut être complètement réécrit pour n'extraire que 12 éléments aléatoires et les écarter du tableau principal du courant d'une manière beaucoup plus efficace. L'idée est de permuter n (petit échantillon cible) à la fin du tableau avec d'autres éléments situés à des endroits aléatoires, puis vérifier la somme, répéter ce processus jusqu'à ce qu'une condition soit remplie, et enfin extraire la vue jusqu'au dernier n avant de redimensionner la vue afin d'écarter les derniers éléments. Tout ceci peut être fait dans O(n) plutôt que O(m) le moment où m est la taille du tableau principal du courant avec n << m (par exemple, 12 VS 20_000). Il peut également être calculé sans aucune allocation coûteuse. Voici le code résultant :

@nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
def swap(arr, i, j):
    arr[i], arr[j] = arr[j], arr[i]

@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
    final_dict = nb.typed.Dict.empty(String, FinalDictValue)
    for i, j in enumerate(values_.keys()):
        Rand_vals = nb.typed.List.empty_list(ValueArray)
        goal_sum = params[j][1] * params[j][3]
        tel = goal_sum / dec_ * 10
        values = values_[j]
        n = params[j][0]

        if n != 0:
            for k in range(Sno):
                final_sum = 0.0
                iter_ = 0
                t = 1

                m = values.size
                assert n <= m
                group = values[-n:]

                while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
                    iter_ += 1

                    # Swap the group view with other random items
                    for i in range(m - n, m):
                        swap(values, i, np.random.randint(0, m))

                    # For small data volume:
                    # final_sum = 0.0016 * np.sum(group)

                    # For med/large data volume
                    final_sum = 0.0
                    for v in group:
                        final_sum += v ** 3

                    if iter_ == upd_:
                        t += 1
                        tel *= t

                assert iter_ > 0
                values = values[:m-n]
                Rand_vals.append(group)
        else:
            for k in range(Sno):
                Rand_vals.append(np.empty(0, dtype=nb.float64))
        final_dict["R" + str(i)] = Rand_vals
    return final_dict

En plus d'être plus rapide, cette mise en œuvre a l'avantage d'être également plus simple. Les résultats sont assez similaires à ceux de l'implémentation précédente, même si le caractère aléatoire rend la vérification des résultats délicate (d'autant plus que cette fonction n'utilise pas la même méthode pour choisir l'échantillon aléatoire). Notez que cette implémentation ne supprime pas les éléments dans values qui sont dans group par rapport à la précédente (ce qui n'est probablement pas souhaité).


Point de repère

Voici les résultats de la dernière implémentation sur ma machine (hors temps de compilation et de conversion) :

Provided small input (embedded in the question):
 - Initial code:   42.71 ms
 - Numba code:      0.11 ms

Medium input:
 - Initial code:   3481 ms
 - Numba code:       11 ms

Large input:
 - Initial code:   6728 ms
 - Numba code:       20 ms

Notez que le temps de conversion prend à peu près le même temps que le calcul.

Cette dernière mise en œuvre est 316~388 fois plus rapide que le code initial sur de petites entrées.


Notas

Notez que le temps de compilation prend quelques secondes en raison des types dict et lists.

Notez que même s'il est possible de paralléliser l'implémentation, seule la boucle la plus englobante peut être parallélisée. Le fait est qu'il n'y a que peu d'éléments à calculer et que le temps est déjà assez faible (pas le meilleur cas pour le multithreading). <-- De plus, la création de nombreuses matrices temporaires (créées par rng.choice ) fera certainement que la boucle parallèle ne sera pas bien dimensionnée de toute façon. --> De plus, la liste/dict ne peut pas être écrite à partir de plusieurs threads en toute sécurité, il faut donc utiliser des tableaux Numpy dans toute la fonction pour pouvoir le faire (ou ajouter des conversions supplémentaires qui sont déjà coûteuses). De plus, le parallélisme Numba tend à augmenter significativement le temps de compilation qui est déjà important. Enfin, le résultat sera moins déterministe puisque chaque thread Numba possède sa propre graine de générateur de nombres aléatoires et que les éléments calculés par les threads ne peuvent pas être prédits avec la fonction prange (dépendant du runtime parallèle choisi sur la plateforme cible). Notez que dans Numpy il y a une graine globale par défaut utilisée par les fonctions aléatoires habituelles (méthode dépréciée) et que les objets RNG ont leur propre graine (nouvelle méthode préférée).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X