ÉDITER (2019-03-08) : Gord Thompson a commenté ci-dessous avec de bonnes nouvelles provenant des journaux de mise à jour de sqlalchemy : _Depuis SQLAlchemy 1.3.0, publié le 2019-03-04, sqlalchemy prend désormais en charge engine = create_engine(sqlalchemy_url, fast_executemany=True)
pour le mssql+pyodbc
dialecte. C'est-à-dire qu'il n'est plus nécessaire de définir une fonction et d'utiliser @event.listens_for(engine, 'before_cursor_execute')
_ Cela signifie que la fonction ci-dessous peut être supprimée et que seul le drapeau doit être activé dans l'instruction create_engine - tout en conservant l'accélération.
Poste original :
Je viens de créer un compte pour poster ceci. Je voulais commenter le fil de discussion ci-dessus car il s'agit d'un suivi de la réponse déjà fournie. La solution ci-dessus a fonctionné pour moi avec le pilote SQL version 17 sur un stockage Microsft SQL écrivant à partir d'une installation basée sur Ubuntu.
Le code complet que j'ai utilisé pour accélérer les choses de manière significative (en parlant d'une accélération >100x) est ci-dessous. Il s'agit d'un code clé en main, à condition que vous modifiiez la chaîne de connexion avec vos données pertinentes. Pour l'affiche ci-dessus, merci beaucoup pour la solution car j'ai déjà cherché pendant un certain temps.
import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus
conn = "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)
@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
print("FUNC call")
if executemany:
cursor.fast_executemany = True
table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))
s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)
Sur la base des commentaires ci-dessous, j'ai voulu prendre le temps d'expliquer certaines limites concernant les pandas. to_sql
et la façon dont la requête est traitée. Il y a 2 choses qui peuvent causer le MemoryError
qui ont été soulevées afaik :
1) Supposons que vous écrivez sur un stockage SQL distant. Lorsque vous essayez d'écrire un grand DataFrame pandas avec la fonction to_sql
il convertit l'ensemble du dataframe en une liste de valeurs. Cette transformation occupe beaucoup plus de mémoire vive que le DataFrame original (en plus, car l'ancien DataFrame reste toujours présent en mémoire vive). Cette liste est fournie à la méthode finale executemany
pour votre connecteur ODBC. Je pense que le connecteur ODBC a quelques difficultés à gérer des requêtes aussi volumineuses. Une façon de résoudre ce problème est de fournir l'option to_sql
méthode d'un argument de taille de morceau (10**5 semble être autour de l'optimum donnant environ 600 mbit/s ( !) de vitesse d'écriture sur un 2 CPU 7GB ram MSSQL Storage application de Azure - je ne peux pas recommander Azure btw). Ainsi, la première limitation, à savoir la taille de la requête, peut être contournée en fournissant un argument de type chunksize
argument. Cependant, cela ne vous permettra pas d'écrire un cadre de données de la taille de 10**7 ou plus, (du moins pas sur la VM avec laquelle je travaille qui a ~55GB RAM), ce qui est le problème n°2.
Il est possible de contourner ce problème en divisant le DataFrame à l'aide de np.split
(étant des morceaux de DataFrame de taille 10**6) Ceux-ci peuvent être écrits de manière itérative. J'essaierai de faire une demande de pull lorsque j'aurai une solution prête pour le système de gestion des données. to_sql
dans le noyau de pandas lui-même, de sorte que vous n'aurez pas à faire ce travail de pré-rupture à chaque fois. Quoi qu'il en soit, j'ai fini par écrire une fonction similaire (mais pas clé en main) à la suivante :
import pandas as pd
import numpy as np
def write_df_to_sql(df, **kwargs):
chunks = np.split(df, df.shape()[0] / 10**6)
for chunk in chunks:
chunk.to_sql(**kwargs)
return True
Un exemple plus complet de l'extrait ci-dessus peut être consulté ici : https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py
Il s'agit d'une classe que j'ai écrite et qui incorpore le correctif et allège une partie de la surcharge nécessaire à la mise en place de connexions avec SQL. Je dois encore écrire de la documentation. J'avais également l'intention de contribuer le patch à pandas lui-même, mais je n'ai pas encore trouvé une façon agréable de le faire.
J'espère que cela vous aidera.