J'ai besoin de paralléliser de manière embarrassante le travail de récupération pour des milliers de requêtes sql à partir de la base de données. Voici un exemple simplifié.
##Infos Env: python=3.7 postgresql=10 dask=latest
##Générer la table de la base de données exemple.
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
data = pd.DataFrame(np.random.randint(0,100 , size=(30000,5)),columns=['a','b','c','d','e'])
data.to_sql('tablename',engine,index=True,if_exists='append')
Tout d'abord, voici l'exemple de base sans parallélisme dask.
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
def job(indexstr):
'envoyer la requête, récupérer les données, faire des calculs et retourner'
sql='select * from public.tablename where index='+indexstr
df=pd.read_sql_query(sql, engine, index_col='index',)
##Obtenir les données et faire des analyses.
return np.sum(df.values)
for v in range(1000):
lists.append(job(str(v)))
### wall time:17s
Ce n'est pas aussi rapide que nous l'imaginons car à la fois la requête à la base de données et l'analyse des données peuvent prendre du temps et il y a plus de cpu inutilisés.
Ensuite, j'essaie d'utiliser dask pour le paralléliser comme ceci.
def jobWithEngine(indexstr):
`le moteur ne peut être sérialisé entre les processus, donc chaque processus en crée un.`
engine = create_engine('postgresql://dbadmin:dbadmin@server:5432/db01')
sql='select * from public.tablename where index='+indexstr
df=pd.read_sql_query(sql, engine, index_col='index',)
return np.sum(df.values)
import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:jobWithEngine(x))
results_bag = dbdata.compute()
###Wall time:1min8s
Le problème est que je trouve que la création du moteur prend plus de temps et qu'il y en a des milliers.
Il sera recréé à chaque requête sql, ce qui est vraiment coûteux et cela pourrait faire planter le service de la base de données!
Alors je suppose qu'il doit y avoir une manière plus élégante comme ceci:
import dask
dask.config.set(scheduler='processes')
import dask.bag as db
dbdata=db.from_sequence([str(v) for v in range(1000)])
dbdata=dbdata.map(lambda x:job(x,init=create_engine))
results_bag = dbdata.compute()
1.Le processus principal crée 8 sous-processus.
2.Chaque processus crée son propre moteur pour initialiser la préparation du travail.
3.Ensuite, le processus principal leur envoie 1000 tâches et récupère les 1000 résultats.
4.Une fois que tout est fait, le moteur du sous-processus est arrêté et le sous-processus est tué.
Ou bien dask a déjà fait cela et le temps additionnel vient des communications entre les processus?