2 votes

Comment puis-je mettre à jour une liste de listes très rapidement de manière thread-safe? - python

Je suis en train d'écrire un script pour ajouter une "colonne" à une liste de listes Python à 500 Hz. Voici le code qui génère des données de test et les fait passer à travers un thread séparé :

# fileA
import random, time, threading
data = [[] for _ in range(4)]  # liste avec 4 listes vides (4 lignes)
column = [random.random() for _ in data]  # colonne synthétique de données
def synthesize_data():
    while True:
        for x,y in zip(data,column):
            x.append(y)
        time.sleep(0.002)  # équivalent à 500 Hz
t1 = threading.Thread(target=synthesize_data).start()
# exemple de données
# [[0.61523098235, 0.61523098235, 0.61523098235, ... ],
# [0.15090349809, 0.15090349809, 0.15090349809, ... ],
# [0.92149878571, 0.92149878571, 0.92149878571, ... ],
# [0.41340918409, 0.41340918409, 0.41340918409, ... ]]

# fileB (dans Jupyter Notebook)
[1] import fileA, copy

[2] # obtenir une copie des données à cet instant.
    data = copy.deepcopy(fileA.data)
    for row in data:
        print len(row)

Si vous exécutez la cellule [2] dans le fichierB, vous devriez voir que les longueurs des "lignes" dans data ne sont pas égales. Voici un exemple de sortie lorsque j'exécute le script :

8784
8786
8787
8787

Je pensais peut-être que je récupérais les données au milieu de la boucle for, mais cela suggérerait que les longueurs seraient décalées d'au plus 1. Les différences deviennent de plus en plus importantes avec le temps. Ma question : pourquoi l'ajout rapide de colonnes à une liste de listes est-il instable ? Est-il possible de rendre ce processus plus stable ?

Vous pourriez me suggérer d'utiliser quelque chose comme Pandas, mais je veux utiliser des listes Python en raison de leur avantage de vitesse (le code doit être aussi rapide que possible). J'ai testé la boucle for, la fonction map() et le cadre de données Pandas. Voici mon code de test (dans Jupyter Notebook) :

# Code de configuration
import pandas as pd
import random
channels = ['C3','C4','C5','C2']
a = [[] for _ in channels]
b = [random.random() for _ in a]
def add_col((x,y)):
    x.append(y);
df = pd.DataFrame(index=channels)
b_pandas = pd.Series(b, index=df.index)

%timeit for x,y in zip(a,b): x.append(y)  # 1000000 boucles, meilleur de 3 : 1,32 µs par boucle
%timeit map(add_col, zip(a,b))  # 1000000 boucles, meilleur de 3 : 1,96 µs par boucle
%timeit df[0] = b  # 10000 boucles, meilleur de 3 : 82,8 µs par boucle
%timeit df[0] = b_pandas  # 10000 boucles, meilleur de 3 : 58,4 µs par boucle

Vous pourriez également suggérer que j'ajoute les échantillons à data en tant que lignes, puis que je transpose au moment de l'analyse. Je préférerais ne pas le faire aussi dans l'intérêt de la vitesse. Ce code sera utilisé dans une interface cerveau-ordinateur, où l'analyse se fait dans une boucle. La transposition devrait également se faire dans la boucle, ce qui ralentirait le processus à mesure que les données augmentent.

édition : changement de titre de "Why is quickly adding columns to a list of lists unstable - python" à "How can I update a list of lists very quickly in a thread-safe manner? - python" pour rendre le titre plus descriptif du message et de la réponse.

5voto

Martijn Pieters Points 271458

L'opération deepcopy() copie les listes tels qu'elles sont modifiées par un autre thread, et chaque opération de copie prend un petit peu de temps (plus longtemps les listes sont grandes). Ainsi, entre la copie de la première des 4 listes et la copie de la seconde, l'autre thread a ajouté 2 éléments, ce qui indique qu'une copie d'une liste de 8784 éléments prend entre 0,002 et 0,004 secondes.

C'est parce qu'il n'y a rien qui empêche le threading de passer de l'exécution de synthesize_data() à l'appel de deepcopy.copy(). En d'autres termes, votre code n'est tout simplement pas thread-safe.

Vous devriez coordonner entre vos deux threads; en utilisant un verrou par exemple:

Dans fileA:

# ...
datalock = threading.RLock()
# ...

def synthesize_data():
    while True:
        with datalock:
            for x,y in zip(data,column):
                x.append(y)
            time.sleep(0.002)  # équivalent à 500 Hz

et dans fileB:

with fileA.datalock:
    data = copy.deepcopy(fileA.data)
    for row in data:
        print len(row)

Cela garantit que la copie ne se produit que lorsque le thread dans fileA n'essaie pas d'ajouter plus aux listes.

L'utilisation de verrouillages ralentira vos opérations; je soupçonne que les opérations d'assignation dans pandas sont déjà soumises à des verrous pour les rendre sûres au threading.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X