Excellente réponse de @zzzeek. Pour ceux qui s'interrogent sur les mêmes stats, pour les requêtes que j'ai modifié @zzzeek code légèrement à la requête de ces mêmes enregistrements immédiatement après l'insertion, puis de convertir les enregistrements d'une liste des dicts.
Voici les résultats
SqlAlchemy ORM: Total time for 100000 records 11.9210000038 secs
SqlAlchemy ORM query: Total time for 100000 records 2.94099998474 secs
SqlAlchemy ORM pk given: Total time for 100000 records 7.51800012589 secs
SqlAlchemy ORM pk given query: Total time for 100000 records 3.07699990273 secs
SqlAlchemy Core: Total time for 100000 records 0.431999921799 secs
SqlAlchemy Core query: Total time for 100000 records 0.389000177383 secs
sqlite3: Total time for 100000 records 0.459000110626 sec
sqlite3 query: Total time for 100000 records 0.103999853134 secs
Intéressant de noter que l'interrogation à l'aide de nu sqlite3 est encore d'environ 3 fois plus rapide que l'utilisation de SQLAlchemy de Base. Je suppose que c'est le prix à payer pour avoir un ResultProxy retourné au lieu d'une simple sqlite3 ligne.
SQLAlchemy de Base est d'environ 8 fois plus rapide que l'utilisation d'un ORM. Si l'interrogation à l'aide de la moraine d'oak ridges est beaucoup plus lent, peu importe quoi.
Voici le code que j'ai utilisé:
import time
import sqlite3
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.sql import select
Base = declarative_base()
DBSession = scoped_session(sessionmaker())
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True)
name = Column(String(255))
def init_sqlalchemy(dbname = 'sqlite:///sqlalchemy.db'):
global engine
engine = create_engine(dbname, echo=False)
DBSession.remove()
DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
def test_sqlalchemy_orm(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in range(n):
customer = Customer()
customer.name = 'NAME ' + str(i)
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print "SqlAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs"
t0 = time.time()
q = DBSession.query(Customer)
dict = [{'id':r.id, 'name':r.name} for r in q]
print "SqlAlchemy ORM query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs"
def test_sqlalchemy_orm_pk_given(n=100000):
init_sqlalchemy()
t0 = time.time()
for i in range(n):
customer = Customer(id=i+1, name="NAME " + str(i))
DBSession.add(customer)
if i % 1000 == 0:
DBSession.flush()
DBSession.commit()
print "SqlAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs"
t0 = time.time()
q = DBSession.query(Customer)
dict = [{'id':r.id, 'name':r.name} for r in q]
print "SqlAlchemy ORM pk given query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs"
def test_sqlalchemy_core(n=100000):
init_sqlalchemy()
t0 = time.time()
engine.execute(
Customer.__table__.insert(),
[{"name":'NAME ' + str(i)} for i in range(n)]
)
print "SqlAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs"
conn = engine.connect()
t0 = time.time()
sql = select([Customer.__table__])
q = conn.execute(sql)
dict = [{'id':r[0], 'name':r[0]} for r in q]
print "SqlAlchemy Core query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs"
def init_sqlite3(dbname):
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute("DROP TABLE IF EXISTS customer")
c.execute("CREATE TABLE customer (id INTEGER NOT NULL, name VARCHAR(255), PRIMARY KEY(id))")
conn.commit()
return conn
def test_sqlite3(n=100000, dbname = 'sqlite3.db'):
conn = init_sqlite3(dbname)
c = conn.cursor()
t0 = time.time()
for i in range(n):
row = ('NAME ' + str(i),)
c.execute("INSERT INTO customer (name) VALUES (?)", row)
conn.commit()
print "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec"
t0 = time.time()
q = conn.execute("SELECT * FROM customer").fetchall()
dict = [{'id':r[0], 'name':r[0]} for r in q]
print "sqlite3 query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs"
if __name__ == '__main__':
test_sqlalchemy_orm(100000)
test_sqlalchemy_orm_pk_given(100000)
test_sqlalchemy_core(100000)
test_sqlite3(100000)
J'ai aussi testé sans convertir le résultat de la requête à dicts et les stats sont similaires:
SqlAlchemy ORM: Total time for 100000 records 11.9189999104 secs
SqlAlchemy ORM query: Total time for 100000 records 2.78500008583 secs
SqlAlchemy ORM pk given: Total time for 100000 records 7.67199993134 secs
SqlAlchemy ORM pk given query: Total time for 100000 records 2.94000005722 secs
SqlAlchemy Core: Total time for 100000 records 0.43700003624 secs
SqlAlchemy Core query: Total time for 100000 records 0.131000041962 secs
sqlite3: Total time for 100000 records 0.500999927521 sec
sqlite3 query: Total time for 100000 records 0.0859999656677 secs
L'interrogation avec SQLAlchemy de Base est environ 20 fois plus rapide par rapport à l'ORM.
Important de noter que ces tests sont très superficiels et ne doit pas être pris trop au sérieux. J'ai peut-être raté quelques évident astuces qui pourraient changer les stats complètement.
La meilleure façon de mesurer les améliorations de performance est directement dans votre propre application. Ne prenez pas mes stats pour acquis.