8 votes

Additionner efficacement les produits de matrices complexes avec Numpy

J'ai une matrice, X pour laquelle je calcule une somme pondérée de produits matriciels intermédiaires. Voici un exemple minimal reproductible :

import numpy as np

random_state = np.random.RandomState(1)
n = 5
p = 10

X = random_state.rand(p, n) # 10x5
X_sum = np.zeros((n, n)) # 5x5

# The length of weights are not related to X's dims,
# but will always be smaller
y = 3
weights = random_state.rand(y)

for k in range(y):
    X_sum += np.dot(X.T[:, k + 1:],
                    X[:p - (k + 1), :]) * weights[k]

Cela fonctionne bien et produit les résultats que j'attends. Cependant, comme la taille de n y y grandissent (dans les centaines), cela devient extrêmement coûteux, car le calcul répétitif des produits matriciels n'est pas exactement efficace...

Il existe cependant un schéma évident dans la façon dont les produits sont calculés :

First iteration

Second iteration

Vous pouvez voir qu'au fur et à mesure des itérations, la tranche de la colonne de départ en Xt se déplace vers la droite, tandis que la rangée finale dans X se déplace vers le haut. Voici à quoi ressemblerait la Nième itération :

Nth iteration

Cela signifie en fait qu'un sous-ensemble des mêmes valeurs est multiplié de manière répétée (cf. modifier 2 ), ce qui me semble être une opportunité à exploiter... (c'est-à-dire si je devais calculer manuellement le produit en une seule passe).

Mais j'espère ne pas avoir à faire quoi que ce soit manuellement et qu'il y a peut-être un moyen de réaliser toute cette boucle de manière plus élégante avec Numpy.

Edit 1

Un ensemble de chiffres réalistes :

n = 400
p = 2000
y = 750

Edit 2

Pour répondre au commentaire :

Pourriez-vous expliquer quelles valeurs sont multipliées de manière répétée ?

Considérons le tableau suivant :

n = p = 5
X = np.arange(25).reshape(p, n)

Para k=0 le premier produit se situera entre A y B :

k = 0
A = X.T[:, k + 1:]
B = X[:p - (k + 1), :]
>>> A
array([[ 5, 10, 15, 20],
       [ 6, 11, 16, 21],
       [ 7, 12, 17, 22],
       [ 8, 13, 18, 23],
       [ 9, 14, 19, 24]])
>>> B
array([[ 0,  1,  2,  3,  4],
       [ 5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14],
       [15, 16, 17, 18, 19]])

Et quand k=1 :

k = 1
>>> A
array([[10, 15, 20],
       [11, 16, 21],
       [12, 17, 22],
       [13, 18, 23],
       [14, 19, 24]])
>>> B
array([[ 0,  1,  2,  3,  4],
       [ 5,  6,  7,  8,  9],
       [10, 11, 12, 13, 14]])

Ainsi, chaque produit matriciel suivant est en quelque sorte un sous-ensemble du produit précédent, si cela a un sens.

4voto

Charles Drotar Points 121

TLDR ; j'opterais pour l'utilisation de @Parfait de test_gen_sum sur la base d'une analyse comparative de diverses valeurs de n , p y y . Je garde l'ancienne réponse ici pour des raisons de continuité. .

Évaluer comment n , p , y influencer le choix de l'algorithme

Cette analyse est faite en utilisant les fonctions de @Parfait comme un moyen de déterminer s'il y a vraiment un meilleure solution ou s'il existe une famille de solutions basées sur des valeurs de n , p y y .

import numpy as np
import pytest # This code also requires the pytest-benchmark plugin

def test_for_sum(n, p, y):
    random_state = np.random.RandomState(1)
    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    for k in range(y):
        X_sum += np.dot(X.T[:, k + 1:],
                    X[:p - (k + 1), :]) * weights[k]

    return X_sum

def test_list_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    matrix_list = [np.dot(X.T[:, k + 1:],
                      X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = np.sum(matrix_list, axis=0)

    return X_sum

def test_reduce_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    matrix_list = [(X.T[:, k + 1:] @
                X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = reduce(lambda x,y: x + y, matrix_list)

    return X_sum

def test_concat_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    x_mat = np.concatenate([np.matmul(X.T[:, k + 1:],
                                  X[:p - (k + 1), :]) for k in range(y)])

    wgt_mat = np.concatenate([np.full((n,1), weights[k]) for k in range(y)])

    mul_res = x_mat * wgt_mat        
    X_sum = mul_res.reshape(-1, n, n).sum(axis=0)

    return X_sum

def test_matmul_sum(n, p, y):
    random_state = np.random.RandomState(1)
    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)
    # Use list comprehension and np.matmul 
    matrices_list = [np.matmul(X.T[:, k + 1:],
                           X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    # Sum matrices in list of matrices to get the final result   
    X_sum = np.sum(matrices_list, axis=0)

    return X_sum

def test_gen_sum(n, p, y):
    random_state = np.random.RandomState(1)

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    weights = random_state.rand(y)

    matrix_gen = (np.dot(X.T[:, k + 1:],
                     X[:p - (k + 1), :]) * weights[k] for k in range(y))

    X_sum = sum(matrix_gen)

    return X_sum

parameters = [
    pytest.param(400, 800, 3)
    ,pytest.param(400, 2000, 3)
    ,pytest.param(400, 800, 750)
    ,pytest.param(400, 2000, 750)
]

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_for_sum(benchmark, n, p, y):
    benchmark(test_for_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_list_sum(benchmark, n, p, y):
     benchmark(test_list_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_reduce_sum(benchmark, n, p, y):
    benchmark(test_reduce_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_concat_sum(benchmark, n, p, y):
    benchmark(test_concat_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_matmul_sum(benchmark, n, p, y):
    benchmark(test_matmul_sum, n=n, p=p, y=y)

@pytest.mark.parametrize('n,p,y', parameters)
def test_test_gen_sum(benchmark, n, p, y):
    benchmark(test_gen_sum, n=n, p=p, y=y)
  • n=400 , p=800 , y=3 (100 itérations)

    • gagnant : test_gen_sum enter image description here
  • n=400 , p=2000 , y=3 (100 itérations)

    • gagnant : test_gen_sum enter image description here
  • n=400 , p=800 , y=750 (10 itérations)

    • gagnant : test_gen_sum enter image description here
  • n=400 , p=2000 , y=750 (10 itérations)

    • gagnant : test_gen_sum enter image description here

ANCIENNE RÉPONSE

Plus petit y valeurs

J'utiliserais certainement np.matmul au lieu de np.dot Cela vous permettra d'obtenir le plus gros gain de performance et en fait la documentation pour np.dot vous dirigera vers np.matmul pour la multiplication des tableaux 2D au lieu de np.dot .

J'ai testé les deux np.dot y np.matmul avec et sans compréhension de la liste et le pytest-benchmark les résultats sont ici :

y=3

Au fait, pytest-benchmark est assez astucieux et je le recommande vivement dans des cas comme celui-ci pour valider si une approche est vraiment performante.

Le simple fait d'utiliser la compréhension des listes a un effet presque négligeable sur les résultats de l'enquête. np.matmul résultats et un effet négatif sur np.dot (bien qu'il s'agisse d'une meilleure forme) dans le schéma des choses, mais la combinaison des deux changements a donné les meilleurs résultats en termes. Je tiens à vous avertir que l'utilisation des compréhensions de listes a tendance à augmenter l'écart-type du temps d'exécution, de sorte que vous pouvez constater des écarts plus importants dans les performances du temps d'exécution que si vous utilisiez simplement la fonction np.matmul .

Voici le code :

import numpy as np

def test_np_matmul_list_comprehension():
    random_state = np.random.RandomState(1)
    n = p = 1000
    X = np.arange(n * n).reshape(p, n)

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 3
    weights = [1, 1, 1]
    # Use list comprehension and np.matmul 
    matrices_list = [np.matmul(X.T[:, k + 1:],
                             X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    # Sum matrices in list of matrices to get the final result   
    X_sum = np.sum(matrices_list, axis=0)

Plus grand y valeurs

Pour des valeurs plus élevées de y vous feriez mieux de ne pas utiliser la compréhension de liste. Le temps d'exécution moyen/médian tend à être plus important pour les deux types d'applications. np.dot y np.matmul dans ces deux cas. Voici les pytest-benchmark résultats pour ( n=500 , p=5000 , y=750 ) :

enter image description here

C'est probablement exagéré, mais je préfère éviter d'être trop utile :).

3voto

Parfait Points 10832

Considérez les versions refactorisées suivantes par rapport aux appels itératifs de la somme en for boucle. Les nouvelles versions utilisant reduce générateur, et np.concatenate est légèrement plus rapide, mais reste comparable à for boucle. Chacune fonctionne avec n = 400, p = 800, y = 750 .

OP Version originale

import numpy as np

def test_for_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    for k in range(y):
        X_sum += np.dot(X.T[:, k + 1:],
                        X[:p - (k + 1), :]) * weights[k]

    return X_sum

Compréhension de listes avec np.dot

def test_list_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    matrix_list = [np.dot(X.T[:, k + 1:],
                          X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = sum(matrix_list)

    return X_sum

Version du générateur

def test_gen_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    matrix_gen = (np.dot(X.T[:, k + 1:],
                         X[:p - (k + 1), :]) * weights[k] for k in range(y))

    X_sum = sum(matrix_gen)

    return X_sum

Version réduite (en utilisant les nouvelles @ opérateur --sucre syntaxique-- à la place de np.matmul )

from functools import reduce

def test_reduce_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    matrix_list = [(X.T[:, k + 1:] @
                    X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    X_sum = reduce(lambda x,y: x + y, matrix_list)

    return X_sum

Version concaténée

def test_concat_sum():
    random_state = np.random.RandomState(1)
    n= 400
    p = 800

    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)

    x_mat = np.concatenate([np.matmul(X.T[:, k + 1:],
                                      X[:p - (k + 1), :]) for k in range(y)])

    wgt_mat = np.concatenate([np.full((n,1), weights[k]) for k in range(y)])

    mul_res = x_mat * wgt_mat        
    X_sum = mul_res.reshape(-1, n, n).sum(axis=0)

    return X_sum

Compréhension de liste avec np.matmul

def test_matmul_sum():
    random_state = np.random.RandomState(1)
    n = 400
    p = 800
    X = random_state.rand(p, n)
    X_sum = np.zeros((n, n))

    # The length of weights are not related to X's dims,
    # but will always be smaller
    y = 750
    weights = random_state.rand(y)
    # Use list comprehension and np.matmul 
    matrices_list = [np.matmul(X.T[:, k + 1:],
                               X[:p - (k + 1), :]) * weights[k] for k in range(y)]

    # Sum matrices in list of matrices to get the final result   
    X_sum = np.sum(matrices_list, axis=0)

    return X_sum

Horaires

import time

start_time = time.time()
res_for = test_for_sum()
print("SUM: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_list = test_list_sum()
print("LIST: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_gen = test_gen_sum()
print("GEN: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_reduce= test_reduce_sum()
print("REDUCE: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_concat = test_concat_sum()
print("CONCAT: {} seconds ---".format(time.time() - start_time))

start_time = time.time()
res_matmul = test_matmul_sum()
print("MATMUL: {} seconds ---".format(time.time() - start_time))

Tests d'égalité

print(np.array_equal(res_for, res_list))
# True
print(np.array_equal(res_for, res_gen))
# True
print(np.array_equal(res_for, res_reduce))
# True
print(np.array_equal(res_for, res_concat))
# True
print(np.array_equal(res_for, res_matmul))
# True

First Run

# SUM: 21.569773197174072 seconds ---
# LIST: 23.576102018356323 seconds ---
# GEN: 21.385253429412842 seconds ---
# REDUCE: 21.426464080810547 seconds ---
# CONCAT: 21.059731483459473 seconds ---
# MATMUL: 23.57494807243347 seconds ---

Second Run

# SUM: 21.6339168548584 seconds ---
# LIST: 19.767740488052368 seconds ---
# GEN: 23.86947798728943 seconds ---
# REDUCE: 19.880712032318115 seconds ---
# CONCAT: 20.761067152023315 seconds ---
# MATMUL: 23.55513620376587 seconds ---

Third Run

# SUM: 22.764745473861694 seconds ---
# LIST: 19.953850984573364 seconds ---
# GEN: 24.37714171409607 seconds ---
# REDUCE: 22.54508638381958 seconds ---
# CONCAT: 21.20585823059082 seconds ---
# MATMUL: 22.303589820861816 seconds ---

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X