27 votes

Comment puis-je reproduire de manière fiable les conditions de course dans ce code Python?

Contexte

Récemment, j'ai posté une classe de minuterie pour examen sur Code Review. J'avais le pressentiment qu'il y avait des bugs de concurrence car j'avais vu un test unitaire échouer une fois, mais je n'avais pas pu reproduire l'échec. C'est pourquoi j'ai posté sur code review.

J'ai reçu des commentaires formidables mettant en évidence diverses conditions de concurrence dans le code. (Je pensais) avoir compris le problème et la solution, mais avant d'apporter des corrections, je voulais exposer les bugs avec un test unitaire. Lorsque j'ai essayé, j'ai réalisé que c'était difficile. Diverses réponses sur Stack Exchange suggéraient que je devrais contrôler l'exécution des threads pour exposer le(s) bug(s) et que tout timing artificiel ne serait pas nécessairement portable sur une autre machine. Cela semblait être une complexité accidentelle au-delà du problème que j'essayais de résoudre.

À la place, j'ai essayé d'utiliser le meilleur outil d'analyse statique (SA) pour Python, PyLint, pour voir s'il repérerait certains des bugs, mais il n'a pas pu le faire. Pourquoi un humain peut-il trouver les bugs à travers une revue de code (essentiellement de l'AS), mais un outil d'AS ne le pourrait pas?

Effrayé à l'idée d'essayer de faire fonctionner Valgrind avec Python (ce qui semblait être du yak-shaving), j'ai décidé de m'attaquer à la correction des bugs sans les reproduire d'abord. Maintenant, je suis dans une impasse.

Voici maintenant le code.

from threading import Timer, Lock
from time import time

class NotRunningError(Exception): pass
class AlreadyRunningError(Exception): pass

class KitchenTimer(object):
    '''
    Modélise de manière lâche un minuteur de cuisine mécanique avec les différences suivantes:
        Vous pouvez démarrer le minuteur avec une durée arbitraire (par exemple 1,2 secondes).
        Le minuteur appelle une fonction donnée quand le temps est écoulé.
        La requête du temps restant a une précision de 0,1 seconde.
    '''

    PRECISION_NUM_DECIMAL_PLACES = 1
    RUNNING = "RUNNING"
    STOPPED = "STOPPED"
    TIMEUP  = "TIMEUP"

    def __init__(self):
        self._stateLock = Lock()
        with self._stateLock:
            self._state = self.STOPPED
            self._timeRemaining = 0

    def start(self, duration=1, whenTimeup=None):
        '''
        Démarre le minuteur pour compter à rebours à partir de la durée donnée et appelle whenTimeup quand le temps est écoulé.
        '''
        with self._stateLock:
            if self.isRunning():
                raise AlreadyRunningError
            else:
                self._state = self.RUNNING
                self.duration = duration
                self._userWhenTimeup = whenTimeup
                self._startTime = time()
                self._timer = Timer(duration, self._whenTimeup)
                self._timer.start()

    def stop(self):
        '''
        Arrête le minuteur, empêchant le rappel de whenTimeup.
        '''
        with self._stateLock:
            if self.isRunning():
                self._timer.cancel()
                self._state = self.STOPPED
                self._timeRemaining = self.duration - self._elapsedTime()
            else:
                raise NotRunningError()

    def isRunning(self):
        return self._state == self.RUNNING

    def isStopped(self):
        return self._state == self.STOPPED

    def isTimeup(self):
        return self._state == self.TIMEUP

    @property
    def timeRemaining(self):
        if self.isRunning():
            self._timeRemaining = self.duration - self._elapsedTime()
        return round(self._timeRemaining, self.PRECISION_NUM_DECIMAL_PLACES)

    def _whenTimeup(self):
        with self._stateLock:
            self._state = self.TIMEUP
            self._timeRemaining = 0
            if callable(self._userWhenTimeup):
                self._userWhenTimeup()

    def _elapsedTime(self):
        return time() - self._startTime

Question

Dans le contexte de cet exemple de code, comment puis-je exposer les conditions de concurrence, les corriger et prouver qu'elles sont corrigées?

Points supplémentaires

Points supplémentaires pour un cadre de test adapté à d'autres implémentations et problèmes, plutôt que spécifiquement à ce code.

Conclusion

Ma conclusion est que la solution technique pour reproduire les conditions de concurrence identifiées est de contrôler la synchronisation de deux threads pour garantir qu'ils s'exécutent dans l'ordre qui exposera un bug. Le point important ici est qu'il s'agit de conditions de concurrence déjà identifiées. La meilleure façon que j'ai trouvée pour identifier les conditions de concurrence est de soumettre votre code à une revue de code et d'encourager des personnes plus expertes à l'analyser.

5voto

Tim Pierce Points 2887

Traditionnellement, la création de conditions de course dans du code multithread est réalisée avec des sémaphores, vous pouvez donc obliger un thread à attendre qu'un autre thread atteigne une certaine condition de bord avant de continuer.

Par exemple, votre objet a du code pour vérifier que start n'est pas appelé si l'objet est déjà en cours d'exécution. Vous pourriez forcer cette condition pour vous assurer qu'elle se comporte comme prévu en faisant quelque chose comme ceci :

  • démarrer un KitchenTimer
  • bloquer le minuteur sur un sémaphore alors qu'il est en mode en cours d'exécution
  • démarrer le même minuteur dans un autre thread
  • attraper AlreadyRunningError

Pour faire certaines de ces choses, vous devrez peut-être étendre la classe KitchenTimer. Les tests unitaires formels utilisent souvent des objets fictifs définis pour bloquer aux moments critiques. Les objets mock sont un sujet plus vaste que je ne peux traiter ici, mais en recherchant "python mock object" sur Google, vous trouverez beaucoup de documentation et de nombreuses implémentations parmi lesquelles choisir.

Voici comment vous pourriez forcer votre code à lancer AlreadyRunningError :

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def start(self, duration=1, whenTimeUp=None):
        KitchenTimer.start(self, duration, whenTimeUp)
        with self._runningLock:
            print "attente sur _runningLock"
            self._runningLock.wait()

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

minuteur = TestKitchenTimer()

# Démarrer le minuteur dans un sous-thread. Ce thread bloquera dès que
# il est démarré.
thread_1 = threading.Thread(target=minuteur.start, args=(10, None))
thread_1.start()

# Tenter de démarrer le minuteur dans un second thread, ce qui le force à lancer
# une AlreadyRunningError.
essayer:
    thread_2 = threading.Thread(target=minuteur.start, args=(10, None))
    thread_2.start()
sauf AlreadyRunningError:
    print "AlreadyRunningError"
    minuteur.resume()
    minuteur.stop()

En parcourant le code, identifiez certaines des conditions limites que vous souhaitez tester, pensez ensuite à l'endroit où vous devez mettre en pause le minuteur pour forcer cette condition à se produire, et ajoutez des Conditions, des Sémaphores, des Événements, etc. pour que cela se produise. Par exemple, que se passe-t-il si, juste au moment où le minuteur exécute le rappel whenTimeUp, un autre thread essaie de l'arrêter ? Vous pouvez forcer cette condition en faisant attendre le minuteur dès qu'il est entré _whenTimeUp :

import threading

class TestKitchenTimer(KitchenTimer):

    _runningLock = threading.Condition()

    def _whenTimeup(self):
        with self._runningLock:
            self._runningLock.wait()
        KitchenTimer._whenTimeup(self)

    def resume(self):
        with self._runningLock:
            self._runningLock.notify()

def TimeupCallback():
    print "TimeupCallback was called"

minuteur = TestKitchenTimer()

# Le thread du minuteur bloquera lorsque le minuteur expirera, mais avant que le rappel ne soit invoqué.
thread_1 = threading.Thread(target=minuteur.start, args=(1, TimeupCallback))
thread_1.start()
sleep(2)

# Le minuteur est maintenant bloqué. Dans le thread parent, nous l'arrêtons.
minuteur.stop()
print "minuteur est arrêté : %r" % minuteur.isStopped()

# Maintenant, autorisez le thread de décompte à reprendre.
minuteur.resume()

Créer une sous-classe de la classe que vous souhaitez tester n'est pas une méthode optimale pour l'instrumenter pour les tests : vous devrez en grande partie remplacer toutes les méthodes pour tester les conditions de course dans chacune d'elles, et à ce stade, on peut argumenter que vous ne testez pas vraiment le code d'origine. Au lieu de cela, vous trouverez peut-être plus propre d'ajouter les sémaphores directement dans l'objet KitchenTimer mais initialisés par défaut à None, et de faire en sorte que vos méthodes vérifient si testRunningLock n'est pas None : avant d'acquérir ou d'attendre sur le verrou. Vous pouvez ensuite forcer des courses sur le code réel que vous soumettez.

La lecture de quelques tutoriels sur les cadres de simulation Python peut être utile. En fait, je ne suis pas sûr que les mocks seraient utiles pour tester ce code : il est presque entièrement autonome et ne dépend pas de nombreux objets externes. Mais les tutoriels sur les mocks touchent parfois à des problèmes comme ceux-ci. Je n'ai utilisé aucun d'entre eux, mais la documentation sur ces derniers semble être un bon point de départ :

5voto

siebz0r Points 3960

La solution la plus courante pour tester du code (non)sûr en termes de threads est de démarrer de nombreux threads et d'espérer le meilleur. Le problème que j'ai, et que d'autres peuvent imaginer, avec cela est qu'elle repose sur le hasard et rend les tests "lourds".

Comme je suis tombé là-dessus il y a un certain temps, j'ai préféré privilégier la précision plutôt que la force brute. Le résultat est un morceau de code de test pour provoquer des conditions de course en laissant les threads courir au coude à coude.

Exemple de code de course

spam = []

def set_spam():
    spam[:] = foo()
    use(spam)

Si set_spam est appelé à partir de plusieurs threads, une condition de course existe entre la modification et l'utilisation de spam. Essayons de le reproduire de manière cohérente.

Comment provoquer des conditions de course

class TriggeredThread(threading.Thread):
    def __init__(self, sequence=None, *args, **kwargs):
        self.sequence = sequence
        self.lock = threading.Condition()
        self.event = threading.Event()
        threading.Thread.__init__(self, *args, **kwargs)

    def __enter__(self):
        self.lock.acquire()
        while not self.event.is_set():
            self.lock.wait()
        self.event.clear()

    def __exit__(self, *args):
        self.lock.release()
        if self.sequence:
            next(self.sequence).trigger()

    def trigger(self):
        with self.lock:
            self.event.set()
            self.lock.notify()

Ensuite, pour démontrer l'utilisation de ce thread :

spam = []  # Utilisez une liste pour partager des valeurs entre les threads.
results = []  # Enregistrer les résultats.

def set_spam():
    thread = threading.current_thread()
    with thread:  # Acquiert le verrou.
        # Définir 'spam' comme nom du thread
        spam[:] = [thread.name]
    # Le thread 'libère' le verrou lors de la sortie du contexte.
    # Le thread suivant est déclenché et ce thread attend un déclenchement.
    with thread:
        # Comme chaque thread écrase le contenu de la liste 'spam',
        # ceci ne devrait être vrai que pour le dernier thread.
        results.append(spam == [thread.name])

threads = [
    TriggeredThread(name='a', target=set_spam),
    TriggeredThread(name='b', target=set_spam),
    TriggeredThread(name='c', target=set_spam)]

# Créer une séquence décalée de threads et la partager entre les threads.
thread_sequence = itertools.cycle(threads[1:] + threads[:1])
for thread in threads:
    thread.sequence = thread_sequence

# Démarrer chaque thread
[thread.start() for thread in threads]
# Déclencher le premier thread.
# Ce thread déclenchera le thread suivant, et ainsi de suite.
threads[0].trigger()
# Attendre que chaque thread se termine.
[thread.join() for thread in threads]
# Le dernier thread 'a remporté la course' en écrasant la valeur
# de 'spam', donc [False, False, True].
# Si set_spam était sûr en termes de thread, tous les résultats seraient vrais.
assert results == [False, False, True], "condition de course déclenchée"
assert results == [True, True, True], "le code est sûr en termes de thread"

Je pense avoir suffisamment expliqué cette construction pour que vous puissiez l'implémenter dans votre propre situation. Je pense que cela convient assez bien à la section des 'points supplémentaires' :

points supplémentaires pour un cadre de test adapté à d'autres implémentations et problèmes plutôt que spécifiquement à ce code.

Résolution des conditions de course

Variables partagées

Chaque problème de threading est résolu de manière spécifique. Dans l'exemple ci-dessus, j'ai provoqué une condition de course en partageant une valeur entre les threads. Des problèmes similaires peuvent survenir lors de l'utilisation de variables globales, telles qu'un attribut de module. La clé pour résoudre de tels problèmes peut être d'utiliser un stockage local au thread :

# Le stockage local au thread est un global.
# Cela peut sembler étrange au début, mais il n'est en fait pas partagé entre les threads.
data = threading.local()
data.spam = []  # Cette liste n'existe que dans ce thread.
results = []  # Les résultats sont cependant partagés.

def set_spam():
    thread = threading.current_thread()
    # 'get' ou définissez la liste 'spam'. Cela crée en fait une nouvelle liste.
    # Si la liste était partagée entre les threads, cela provoquerait une condition de course.
    data.spam = getattr(data, 'spam', [])
    with thread:
        data.spam[:] = [thread.name]
    with thread:
        results.append(data.spam == [thread.name])

# Démarrer les threads comme dans l'exemple ci-dessus.

assert all(results)  # Tous les résultats devraient être vrais.

Lectures/écritures concurrentes

Un problème de threading courant est le problème de plusieurs threads lisant et/ou écrivant dans un conteneur de données simultanément. Ce problème est résolu en mettant en œuvre un verrou de lecture/écriture. L'implémentation réelle d'un verrou de lecture/écriture peut différer. Vous pouvez choisir un verrou de lecture en premier, un verrou d'écriture en premier ou simplement au hasard.

Je suis sûr qu'il existe des exemples décrivant de telles techniques de verrouillage. Je pourrais écrire un exemple plus tard, car cette réponse est déjà assez longue. ;-)

Notes

Jetez un coup d'œil à la documentation du module threading et expérimentez un peu avec. Comme chaque problème de threading est différent, différentes solutions s'appliquent.

En parlant de threading, jetez un œil au GIL Python (Global Interpreter Lock). Il est important de noter que le threading n'est peut-être pas la meilleure approche pour optimiser les performances (mais ce n'est pas votre objectif). J'ai trouvé cette présentation assez bonne : https://www.youtube.com/watch?v=zEaosS1U5qY

4voto

perreal Points 47912

Vous pouvez le tester en utilisant un grand nombre de threads :

import sys, random, thread
def timeup():
    sys.stdout.write("Timer:: Up %f" % time())

def trdfunc(kt, tid):
    while True:
        sleep(1)
        if not kt.isRunning():
            if kt.start(1, timeup):
                sys.stdout.write("[%d]: a commencé\n" % tid)
        else:
            if random.random() < 0.1:
                kt.stop()
                sys.stdout.write("[%d]: s'est arrêté\n" % tid)
        sys.stdout.write("[%d] reste %f\n" % (tid, kt.timeRemaining))

kt = KitchenTimer()
kt.start(1, timeup)
for i in range(1, 100):
    thread.start_new_thread ( trdfunc, (kt, i) )
trdfunc(kt, 0)

Un couple de problèmes que je vois :

  • Lorsqu'un thread voit le chronomètre comme n'étant pas en cours d'exécution et essaye de le démarrer, le code lève généralement une exception en raison d'un changement de contexte entre le test et le démarrage. Je pense qu'une exception est trop. Ou vous pouvez avoir une fonction testAndStart atomique

  • Un problème similaire se produit avec stop. Vous pouvez implémenter une fonction testAndStop.

  • Même ce code de la fonction timeRemaining:

    if self.isRunning():
       self._timeRemaining = self.duration - self._elapsedTime()

    Nécessite une sorte d'atomicité, peut-être vous avez besoin de saisir un verrou avant de tester isRunning

Si vous prévoyez de partager cette classe entre les threads, vous devez résoudre ces problèmes.

3voto

Lazin Points 4481

En général - ce n'est pas une solution viable. Vous pouvez reproduire cette condition de concurrence en utilisant un débogueur (définir des points d'arrêt à certains endroits dans le code, puis, lorsque l'un des points d'arrêt est atteint - geler le thread et exécuter le code jusqu'à ce qu'il atteigne un autre point d'arrêt, puis geler ce thread et dégeler le premier thread, vous pouvez imbriquer l'exécution des threads de n'importe quelle manière en utilisant cette technique).

Le problème est - plus vous avez de threads et de code, plus ils auront de façons d'imbriquer les effets secondaires. En fait - cela va croître de manière exponentielle. Il n'y a pas de solution viable pour le tester en général. C'est possible uniquement dans certains cas simples.

Les solutions à ce problème sont bien connues. Écrivez du code qui est conscient de ses effets secondaires, contrôlez les effets secondaires avec des primitives de synchronisation comme les verrous, les sémaphores ou les files d'attente ou utilisez des données immuables si c'est possible.

Peut-être que la façon la plus pratique est d'utiliser des vérifications en temps d'exécution pour forcer l'ordre des appels correct. Par exemple (pseudocode):

class ObjetConcurrent:
    def __init__(self):
        self.__cnt = 0
        ...

    def isReadyAndLocked(self):
        acquérir_verrou_objet
            if self.__cnt % 2 != 0:
                # un autre thread est prêt à démarrer le travail
                return False
            if self.__is_ready:
                self.__cnt += 1
                return True
            # Le travail est en cours ou n'est pas encore prêt
            return False
        libérer_verrou_objet

    def doJobAndRelease(self):
        acquérir_verrou_objet
            if self.__cnt % 2 != 1:
                raise ConditionDeCourseDétectée("Ordre incorrect")
            self.__cnt += 1
            effectuer_travail()
        libérer_verrou_objet

Ce code jettera une exception si vous ne vérifiez pas isReadyAndLock avant d'appeler doJobAndRelease. Cela peut être testé facilement en n'utilisant qu'un seul thread.

obj = ObjetConcurrent()
...
# utilisation correcte
if obj.isReadyAndLocked()
    obj.doJobAndRelease()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X