71 votes

Accélération de pandas.DataFrame.to_sql avec fast_executemany de pyODBC

Je voudrais envoyer un grand pandas.DataFrame vers un serveur distant exécutant MS SQL. La façon dont je procède actuellement consiste à convertir un fichier de type data_frame à une liste de tuples, puis l'envoyer avec la fonction pyODBC executemany() fonction. Cela donne quelque chose comme ça :

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

J'ai alors commencé à me demander si les choses pouvaient être accélérées (ou au moins plus lisibles) en utilisant data_frame.to_sql() méthode. J'ai trouvé la solution suivante :

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

Maintenant le code est plus lisible, mais le téléchargement est au moins 150 fois plus lent ...

Y a-t-il un moyen d'inverser le fast_executemany lors de l'utilisation de SQLAlchemy ?

J'utilise pandas-0.20.3, pyODBC-4.0.21 et sqlalchemy-1.1.13.

78voto

hetspookjee Points 281

ÉDITER (2019-03-08) : Gord Thompson a commenté ci-dessous avec de bonnes nouvelles provenant des journaux de mise à jour de sqlalchemy : _Depuis SQLAlchemy 1.3.0, publié le 2019-03-04, sqlalchemy prend désormais en charge engine = create_engine(sqlalchemy_url, fast_executemany=True) pour le mssql+pyodbc dialecte. C'est-à-dire qu'il n'est plus nécessaire de définir une fonction et d'utiliser @event.listens_for(engine, 'before_cursor_execute')_ Cela signifie que la fonction ci-dessous peut être supprimée et que seul le drapeau doit être activé dans l'instruction create_engine - tout en conservant l'accélération.

Poste original :

Je viens de créer un compte pour poster ceci. Je voulais commenter le fil de discussion ci-dessus car il s'agit d'un suivi de la réponse déjà fournie. La solution ci-dessus a fonctionné pour moi avec le pilote SQL version 17 sur un stockage Microsft SQL écrivant à partir d'une installation basée sur Ubuntu.

Le code complet que j'ai utilisé pour accélérer les choses de manière significative (en parlant d'une accélération >100x) est ci-dessous. Il s'agit d'un code clé en main, à condition que vous modifiiez la chaîne de connexion avec vos données pertinentes. Pour l'affiche ci-dessus, merci beaucoup pour la solution car j'ai déjà cherché pendant un certain temps.

import pandas as pd
import numpy as np
import time
from sqlalchemy import create_engine, event
from urllib.parse import quote_plus

conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
quoted = quote_plus(conn)
new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
engine = create_engine(new_con)

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    print("FUNC call")
    if executemany:
        cursor.fast_executemany = True

table_name = 'fast_executemany_test'
df = pd.DataFrame(np.random.random((10**4, 100)))

s = time.time()
df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
print(time.time() - s)

Sur la base des commentaires ci-dessous, j'ai voulu prendre le temps d'expliquer certaines limites concernant les pandas. to_sql et la façon dont la requête est traitée. Il y a 2 choses qui peuvent causer le MemoryError qui ont été soulevées afaik :

1) Supposons que vous écrivez sur un stockage SQL distant. Lorsque vous essayez d'écrire un grand DataFrame pandas avec la fonction to_sql il convertit l'ensemble du dataframe en une liste de valeurs. Cette transformation occupe beaucoup plus de mémoire vive que le DataFrame original (en plus, car l'ancien DataFrame reste toujours présent en mémoire vive). Cette liste est fournie à la méthode finale executemany pour votre connecteur ODBC. Je pense que le connecteur ODBC a quelques difficultés à gérer des requêtes aussi volumineuses. Une façon de résoudre ce problème est de fournir l'option to_sql méthode d'un argument de taille de morceau (10**5 semble être autour de l'optimum donnant environ 600 mbit/s ( !) de vitesse d'écriture sur un 2 CPU 7GB ram MSSQL Storage application de Azure - je ne peux pas recommander Azure btw). Ainsi, la première limitation, à savoir la taille de la requête, peut être contournée en fournissant un argument de type chunksize argument. Cependant, cela ne vous permettra pas d'écrire un cadre de données de la taille de 10**7 ou plus, (du moins pas sur la VM avec laquelle je travaille qui a ~55GB RAM), ce qui est le problème n°2.

Il est possible de contourner ce problème en divisant le DataFrame à l'aide de np.split (étant des morceaux de DataFrame de taille 10**6) Ceux-ci peuvent être écrits de manière itérative. J'essaierai de faire une demande de pull lorsque j'aurai une solution prête pour le système de gestion des données. to_sql dans le noyau de pandas lui-même, de sorte que vous n'aurez pas à faire ce travail de pré-rupture à chaque fois. Quoi qu'il en soit, j'ai fini par écrire une fonction similaire (mais pas clé en main) à la suivante :

import pandas as pd
import numpy as np

def write_df_to_sql(df, **kwargs):
    chunks = np.split(df, df.shape()[0] / 10**6)
    for chunk in chunks:
        chunk.to_sql(**kwargs)
    return True

Un exemple plus complet de l'extrait ci-dessus peut être consulté ici : https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

Il s'agit d'une classe que j'ai écrite et qui incorpore le correctif et allège une partie de la surcharge nécessaire à la mise en place de connexions avec SQL. Je dois encore écrire de la documentation. J'avais également l'intention de contribuer le patch à pandas lui-même, mais je n'ai pas encore trouvé une façon agréable de le faire.

J'espère que cela vous aidera.

0 votes

Merci pour cela. Je suis sûr que ça va nous aider ! Une dernière chose à noter : pyODBC doit être à 4.0.19 ou plus.

0 votes

Mon code ressemble au votre sauf la création du moteur : engine = sa.create_engine('mssql+pyodbc://SERVER/DATABASE?driver=SQL+‌​Server+Native+Client‌​+11.0') . pensez-vous que le client natif a quelque chose à voir avec la déconnexion ?

0 votes

@j.k. Je viens d'écrire un peu de code passe-partout autour de votre solution, qui fait effectivement le travail, donc je dois vous remercier pour l'effort principal ! @ Cameron Taylor : je ne suis pas sûr, mais cela semble probable, j'espère que vous pourrez l'essayer avec un autre pilote.

40voto

J.K. Points 600

Après avoir contacté les développeurs de SQLAlchemy, un moyen de résoudre ce problème est apparu. Un grand merci à eux pour leur excellent travail !

Il faut utiliser un événement d'exécution du curseur et vérifier si l'événement d'exécution du curseur a été exécuté. executemany a été levé. Si c'est effectivement le cas, changez le fast_executemany option sur. Par exemple :

from sqlalchemy import event

@event.listens_for(engine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
    if executemany:
        cursor.fast_executemany = True

De plus amples informations sur les événements d'exécution sont disponibles ici .


UPDATE : Soutien aux fast_executemany de pyodbc a été ajouté dans SQLAlchemy 1.3.0 Cette modification n'est donc plus nécessaire.

2 votes

Merci beaucoup d'avoir fait le travail de recherche sur ce sujet. Par souci de clarté, ce décorateur et cette fonction doivent être déclarés avant l'instanciation d'un moteur SQLAlchemy ?

2 votes

Vous êtes les bienvenus. Je le déclare juste après avoir instancié le moteur dans le constructeur d'une classe.

1 votes

Ainsi, le code de connexion spécifique à pyodbc n'est plus nécessaire ? il suffit d'appeler to_sql() après cette fonction ?

19voto

Emmanuel Points 291

J'ai rencontré le même problème mais en utilisant PostgreSQL. Ils publient maintenant pandas version 0.24.0 et il y a un nouveau paramètre dans le to_sql fonction appelée method ce qui a résolu mon problème.

from sqlalchemy import create_engine

engine = create_engine(your_options)
data_frame.to_sql(table_name, engine, method="multi")

La vitesse de téléchargement est 100x plus rapide pour moi. Je recommande également de définir le paramètre chunksize si vous comptez envoyer beaucoup de données.

2 votes

Selon pandas.pydata.org/pandas-docs/stable/guide_utilisateur/ , réglage method='multi' est susceptible de ralentir les insertions sur les SGBDR traditionnels lors du chargement dans des tables avec de nombreuses colonnes, mais tend à être plus utile pour des environnements comme Redshift, lorsqu'il s'agit de tables larges.

9voto

Pylander Points 595

Je voulais juste poster cet exemple complet comme une option supplémentaire et performante pour ceux qui peuvent utiliser la nouvelle bibliothèque turbodbc : http://turbodbc.readthedocs.io/en/latest/

Il est clair qu'il existe de nombreuses options entre pandas .to_sql(), le déclenchement de fast_executemany via sqlalchemy, l'utilisation directe de pyodbc avec des tuples/listes/etc. ou même l'essai de BULK UPLOAD avec des fichiers plats.

Avec un peu de chance, ce qui suit pourrait rendre la vie un peu plus agréable à mesure que la fonctionnalité évolue dans le projet pandas actuel ou inclut quelque chose comme l'intégration de turbodbc à l'avenir.

import pandas as pd
import numpy as np
from turbodbc import connect, make_options
from io import StringIO

test_data = '''id,transaction_dt,units,measures
               1,2018-01-01,4,30.5
               1,2018-01-03,4,26.3
               2,2018-01-01,3,12.7
               2,2018-01-03,3,8.8'''

df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])

options = make_options(parameter_sets_to_buffer=1000)
conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)

test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]

                CREATE TABLE [db_name].[schema].[test]
                (
                    id int NULL,
                    transaction_dt datetime NULL,
                    units int NULL,
                    measures float NULL
                )

                INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                VALUES (?,?,?,?) '''

cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]

turbodbc devrait être TRÈS rapide dans de nombreux cas d'utilisation (en particulier avec les tableaux numpy). Observez comment il est simple de passer les tableaux numpy sous-jacents des colonnes du dataframe comme paramètres à la requête directement. Je pense également que cela permet d'éviter la création d'objets intermédiaires qui augmentent excessivement la consommation de mémoire. J'espère que cela vous sera utile !

0 votes

Je vais essayer dans les prochains jours et je reviendrai avec mes résultats.

0 votes

@erickfis est-ce que cela s'est avéré utile pour vous ? Ce serait formidable d'entendre vos conclusions ici.

0 votes

Salut Pylander ! Je n'ai pas encore eu le temps d'essayer, je suis assez occupé. Pour l'instant, j'utilise un outil de la société pour ingérer les données. Mais j'en ai vraiment besoin pour les prochains projets, pour ingérer des données massives sur sql server. L'inconvénient majeur que je vois est que mes dfs ont 240 colonnes chacun. En utilisant pd.to_sql, je n'ai pas besoin de me soucier de chaque colonne. Mais là encore, pd.to_sql est vraiment lent, au point d'être prohibitif. L'utilisation de turbodbc pourrait être ma solution, mais le fait de devoir saisir manuellement chacune de ces 240 colonnes ne me semble pas optimal (étant donné qu'il y a beaucoup de df différents à ingérer).

7voto

Ilja Everilä Points 20976

Il semble que Pandas 0.23.0 et 0.24.0 utiliser des insertions à valeurs multiples avec PyODBC, ce qui empêche l'exécution rapide d'aider - un seul INSERT ... VALUES ... est émise par chunk. Les chunks d'insertion à valeurs multiples sont une amélioration par rapport à l'ancienne méthode lente d'exécution par défaut, mais au moins dans les tests simples, la méthode rapide d'exécution l'emporte toujours, sans parler du fait qu'il n'est pas nécessaire d'effectuer des opérations manuelles. chunksize comme c'est le cas pour les insertions de valeurs multiples. Forcer l'ancien comportement peut être fait par monkeypatching, si aucune option de configuration n'est fournie dans le futur :

import pandas.io.sql

def insert_statement(self, data, conn):
    return self.table.insert(), data

pandas.io.sql.SQLTable.insert_statement = insert_statement

L'avenir est là et au moins dans le master la branche de la méthode d'insertion peut être contrôlée à l'aide de l'argument mot-clé method= de to_sql() . La valeur par défaut est None qui force l'exécution de plusieurs méthodes. Passage de method='multi' résultats en utilisant l'insert multi-valeurs. Il peut même être utilisé pour mettre en œuvre des approches spécifiques aux SGBD, comme Postgresql. COPY .

1 votes

Les développeurs de pandas ont fait des allers-retours sur cette question pendant un certain temps, mais ils ont finalement semblé renoncer à l'approche de l'insertion de plusieurs rangs, du moins pour un certain temps. mssql+pyodbc Moteur SQLAlchemy. pandas 0.23.4 laisse en effet fast_executemany faire son travail.

2 votes

Je n'ai pas vérifié la situation actuelle, mais il a été réintégré dans la version 0.24.0. Edit : il est toujours là, du moins dans la version 0.24.0. master mais c'est contrôlable maintenant : github.com/pandas-dev/pandas/blob/master/pandas/io/sql.py#L1157 . On dirait que passer to_sql(..., method=None) devrait forcer l'exécution de plusieurs approches.

2 votes

...Et None est la valeur par défaut.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X