10 votes

python MySQLDB query timeout

J'essaie d'imposer une limite de temps aux requêtes dans MySQLDB en python. Je n'ai aucun contrôle sur les requêtes, mais je dois m'assurer qu'elles ne s'exécutent pas au-delà d'une limite de temps fixée. J'ai essayé d'utiliser signal.SIGALRM pour interrompre l'appel à l'exécution, mais cela ne semble pas fonctionner. Le signal est envoyé, mais n'est pas pris en compte avant la fin de l'appel à l'exécution.

J'ai écrit un cas de test pour prouver ce comportement :

#!/usr/local/bin/python2.6

import time
import signal

from somewhere import get_dbc

class Timeout(Exception):
    """ Time Exceded """

def _alarm_handler(*args):
    raise Timeout

dbc = get_dbc()

signal.signal(signal.SIGALRM, _alarm_handler)
signal.alarm(1)

try:
    print "START:  ", time.time()
    dbc.execute("SELECT SLEEP(10)")
except Timeout:
    print "TIMEOUT!", time.time()'

Le "SELECT SLEEP(10)" simule une requête lente, mais je constate le même comportement avec une requête lente réelle.

Le résultat :

START:   1254440686.69
TIMEOUT! 1254440696.69

Comme vous pouvez le voir, il dort pendant 10 secondes, puis j'obtiens l'exception Timeout.

Questions :

  1. Pourquoi ne reçois-je le signal qu'après la fin de l'exécution ?
  2. Existe-t-il un autre moyen fiable de limiter le temps d'exécution des requêtes ?

8voto

Alex Martelli Points 330805

La solution tordue de @nosklo est élégant et fonctionnel, mais si vous voulez éviter la dépendance à l'égard de twisted, la tâche est encore réalisable, par exemple :

import multiprocessing

def query_with_timeout(dbc, timeout, query, *a, **k):
  conn1, conn2 = multiprocessing.Pipe(False)
  subproc = multiprocessing.Process(target=do_query,
                                    args=(dbc, query, conn2)+a, 
                                    kwargs=k)
  subproc.start()
  subproc.join(timeout)
  if conn1.poll():
    return conn1.recv()
  subproc.terminate()
  raise TimeoutError("Query %r ran for >%r" % (query, timeout))

def do_query(dbc, query, conn, *a, **k):
  cu = dbc.cursor()
  cu.execute(query, *a, **k)
  return cu.fetchall()

2voto

nosklo Points 75862

Utilisation adbapi . Il permet d'effectuer un appel à la base de données de manière asynchrone.

from twisted.internet import reactor
from twisted.enterprise import adbapi

def bogusQuery():
    return dbpool.runQuery("SELECT SLEEP(10)")

def printResult(l):
    # function that would be called if it didn't time out
    for item in l:
        print item

def handle_timeout():
    # function that will be called when it timeout
    reactor.stop()

dbpool = adbapi.ConnectionPool("MySQLdb", user="me", password="myself", host="localhost", database="async")
bogusQuery().addCallback(printResult)
reactor.callLater(4, handle_timeout)
reactor.run()

2voto

shurik Points 86

J'ai essayé d'utiliser signal.SIGALRM pour interrompre l'appel à l'exécution, mais cela ne semble pas fonctionner. Le signal est envoyé, mais n'est pas pris en compte avant la fin de l'appel à l'exécution.

La bibliothèque mysql gère les appels système interrompus en interne, de sorte que vous ne verrez pas les effets secondaires de SIGALRM avant la fin de l'appel API (à moins de tuer le thread ou le processus en cours).

Vous pouvez essayer Parcheando MySQL-Python et utiliser l'option MYSQL_OPT_READ_TIMEOUT (ajoutée dans mysql 5.0.25).

1voto

John Millikin Points 86775

Pourquoi ne reçois-je le signal qu'après la fin de l'exécution ?

La requête est exécutée par une fonction C, qui bloque l'exécution de la VM Python jusqu'à ce qu'elle revienne.

Existe-t-il un autre moyen fiable de limiter le temps d'exécution des requêtes ?

Il s'agit (IMO) d'une solution très laide, mais elle fait travail. Vous pouvez exécuter la requête dans un processus séparé (soit via fork() ou le multiprocessing module ). Exécutez la minuterie d'alarme dans votre processus principal, et lorsque vous la recevez, envoyez un SIGINT o SIGKILL au processus enfant. Si vous utilisez multiprocessing vous pouvez utiliser la fonction Process.terminate() méthode.

1voto

Vyktor Points 10531

Notes génériques

J'ai rencontré le même problème dernièrement avec plusieurs conditions à remplir :

  • la solution doit être sûre pour les threads
  • plusieurs connexions à la base de données à partir de la même machine peuvent être actives en même temps, ce qui a pour effet de tuer la seule connexion/requête.
  • l'application contient des connexions à de nombreuses bases de données différentes - un gestionnaire portable pour chaque hôte de base de données

Nous avions la disposition de classe suivante ( Malheureusement, je ne peux pas publier de sources réelles. ) :

class AbstractModel: pass 
class FirstDatabaseModel(AbstractModel): pass # Connection to one DB host
class SecondDatabaseModel(AbstractModel): pass # Connection to one DB host

Et a créé plusieurs fils de discussion pour chaque modèle.


Solution Python 3.2

Dans notre application un modèle = une base de données . J'ai donc créé " raccordement au service "pour chaque modèle (de sorte que nous puissions exécuter KILL en connexion parallèle). Par conséquent, si une instance de FirstDatabaseModel a été créée, 2 connexions à la base de données ont été créées ; si 5 instances ont été créées, 6 connexions ont été utilisées :

class AbstractModel:
    _service_connection = None # Formal declaration

    def __init__(self):
        ''' Somehow load config and create connection
        '''
        self.config = # ...
        self.connection = MySQLFromConfig(self.config)
        self._init_service_connection()

        # Get connection ID (pseudocode)
        self.connection_id = self.connection.FetchOneCol('SELECT CONNECTION_ID()') 

    def _init_service_connection(self):
        ''' Initialize one singleton connection for model
        '''
        cls = type(self)
        if cls._service_connection is not None:
            return

        cls._service_connection = MySQLFromConfig(self.config)

Il nous faut maintenant un tueur :

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)

    # Do your own connection check and renewal
    type(self)._service_connection.execute(sql)

Remarque : connection.execute = créer un curseur, exécuter, fermer le curseur.

Et sécuriser le thread killer en utilisant threading.Lock :

def _init_service_connection(self):
    ''' Initialize one singleton connection for model
    '''
    cls = type(self)
    if cls._service_connection is not None:
        return

    cls._service_connection = MySQLFromConfig(self.config)
    cls._service_connection_lock = threading.Lock()

def _kill_connection(self):
    # Add your own mysql data escaping
    sql = 'KILL CONNECTION {}'.format(self.connection_id)
    cls = type(self)

    # Do your own connection check and renewal
    try:
        cls._service_connection_lock.acquire()    
        cls._service_connection.execute(sql)
    finally:
        cls._service_connection_lock.release()

Enfin, ajoutez la méthode d'exécution temporelle en utilisant threading.Timer :

def timed_query(self, sql, timeout=5):
    kill_query_timer = threading.Timer(timeout, self._kill_connection)
    kill_query_timer.start()

    try:
        self.connection.long_query() 
    finally:
        kill_query_timer.cancel()

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X