2 votes

Comment créer un moteur de connexion de base de données dans chaque sous-processus Dask pour paralléliser des milliers de requêtes SQL, sans recréer le moteur à chaque requête

J'ai besoin de paralléliser de manière embarrassante le travail de récupération pour des milliers de requêtes sql à partir de la base de données. Voici un exemple simplifié.

##Infos Env: python=3.7 postgresql=10 dask=latest
##Générer la table de la base de données exemple.
from sqlalchemy import create_engine
import pandas as pd
import numpy as np

engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
data = pd.DataFrame(np.random.randint(0,100 , size=(30000,5)),columns=['a','b','c','d','e'])
data.to_sql('tablename',engine,index=True,if_exists='append')

Tout d'abord, voici l'exemple de base sans parallélisme dask.

from sqlalchemy import create_engine
import pandas as pd
import numpy as np

engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
def job(indexstr):
    'envoyer la requête, récupérer les données, faire des calculs et retourner'
    sql='select * from public.tablename where index='+indexstr
    df=pd.read_sql_query(sql, engine, index_col='index',)
    ##Obtenir les données et faire des analyses.
    return np.sum(df.values)
for v in range(1000):
    lists.append(job(str(v)))
### wall time:17s

Ce n'est pas aussi rapide que nous l'imaginons car à la fois la requête à la base de données et l'analyse des données peuvent prendre du temps et il y a plus de cpu inutilisés.

Ensuite, j'essaie d'utiliser dask pour le paralléliser comme ceci.

def jobWithEngine(indexstr):
    `le moteur ne peut être sérialisé entre les processus, donc chaque processus en crée un.`
    engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
    sql='select * from public.tablename where index='+indexstr
    df=pd.read_sql_query(sql, engine, index_col='index',)
    return np.sum(df.values)
import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:jobWithEngine(x))
results_bag = dbdata.compute()
###Wall time:1min8s

Le problème est que je trouve que la création du moteur prend plus de temps et qu'il y en a des milliers.

Il sera recréé à chaque requête sql, ce qui est vraiment coûteux et cela pourrait faire planter le service de la base de données!

Alors je suppose qu'il doit y avoir une manière plus élégante comme ceci:

import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:job(x,init=create_engine))
results_bag = dbdata.compute()

1.Le processus principal crée 8 sous-processus.

2.Chaque processus crée son propre moteur pour initialiser la préparation du travail.

3.Ensuite, le processus principal leur envoie 1000 tâches et récupère les 1000 résultats.

4.Une fois que tout est fait, le moteur du sous-processus est arrêté et le sous-processus est tué.

Ou bien dask a déjà fait cela et le temps additionnel vient des communications entre les processus?

2voto

AmyChodorowski Points 331

Vous pouvez le faire en définissant une base de données connectée en tant que variable pour chaque travailleur en utilisant get_worker

from dask.distributed import get_worker

def connect_worker_db(db):
    worker = get_worker()
    worker.db = db          # Paramètres de la base de données, mot de passe, nom d'utilisateur, etc
    worker.db.connect()     # Fonction qui connecte la base de données, par exemple create_engine()

Ensuite, faites en sorte que le client exécute le connect_worker_db:

from dask.distributed import Client, get_worker
client = Client()
client.run(connect_worker_db, db)

Pour la fonction utilisant la connexion, comme jobWithEngine(), vous devez obtenir le travailleur et utiliser le paramètre que vous avez enregistré :

def jobWithEngine():
    db = get_worker().db

Ensuite, assurez-vous de vous déconnecter à la fin :

def disconnect_worker_db():
    worker = get_worker()
    worker.db.disconnect()

client.run(disconnect_worker_db)

2voto

Enda Farrell Points 88

La réponse d'Amy a l'avantage d'être simple, mais si pour une raison quelconque dask démarre de nouveaux travailleurs, ils n'auront pas de .db.

Je ne sais pas quand il a été introduit pour la première fois, mais Dask 1.12.2 a un Client.register_worker_callbacks qui prend une fonction en paramètre prévue pour ce type d'utilisation. Si ce callback prend un paramètre appelé dask_worker alors le travailleur lui-même sera passé.

def main():

    dask_client = dask.distributed.Client(cluster)

    db = dict(
        host="db-host",
        username="user",
        # etc etc
    )
    def worker_setup(dask_worker: dask.distributed.Worker):
        dask_worker.db = db

    dask_client.register_worker_callbacks(worker_setup)

https://distributed.dask.org/fr/latest/api.html#distributed.Client.register_worker_callbacks

Cependant, cela ne ferme pas les connexions de la base de données à la fin. Vous probablement serez couvert avec client.run(disconnect_worker_db) mais j'ai vu certains travailleurs ne pas libérer leurs ressources. Pour corriger cela de manière plus complète, cela nécessite un peu plus de code comme indiqué dans https://distributed.dask.org/fr/latest/api.html#distributed.Client.register_worker_plugin

class MyWorkerPlugin(dask.distributed.WorkerPlugin):
    def __init__(self, *args, **kwargs):
        self.db = kwargs.get("db")
        assert self.db, "no db"

    def setup(self, worker: dask.distributed.Worker):
        worker.db = self.db

    def teardown(self, worker: dask.distributed.Worker):
        print(f"worker {worker.name} teardown")
        # eg db.disconnect()

def main():

    cluster = dask.distributed.LocalCluster(
        n_workers=os.cpu_count(),
        threads_per_worker=2,
    )
    dask_client = dask.distributed.Client(cluster)
    db = dict(
        host="db-host",
        username="user",
        # etc etc
    )

    dask_client.register_worker_plugin(MyWorkerPlugin, "set-dbs", db=db)
    dask_client.start()

Vous pouvez donner des noms de plugin un peu utiles, et passer des kwargs à utiliser dans le __init__ du plugin.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X