Comment un exécuteur Spark exécute-t-il le code ? A-t-il plusieurs threads en cours d'exécution ? Si oui, ouvrira-t-il plusieurs connexions JDBC pour lire/écrire des données depuis/vers le SGBDR ?
Réponses
Trop de publicités?Comment un exécuteur Spark exécute-t-il le code ?
La beauté de l'open source, y compris le projet Apache Spark, est que vous pouvez voir le code et trouver la réponse vous-même. Cela ne veut pas dire que c'est la meilleure et la seule façon de trouver la réponse, mais la mienne peut ne pas être aussi claire que le code lui-même (le contraire peut aussi être vrai :))
Ceci étant dit, voir le code de Exécuteur testamentaire vous-même.
A-t-il plusieurs fils en cours d'exécution ?
Oui. Voir cette ligne donde Executor
crée un nouveau TaskRunner
qui est un Java Runnable
(un fil séparé). Ce site Runnable
va être exécuté sur le pool de threads .
En citant l'article de Java Exécuteurs.newCachedThreadPool que Spark utilise pour le pool de threads :
Crée un pool de threads qui crée de nouveaux threads selon les besoins, mais réutilise les threads précédemment construits lorsqu'ils sont disponibles, et utilise le ThreadFactory fourni pour créer de nouveaux threads selon les besoins.
Si oui, ouvrira-t-il plusieurs connexions JDBC pour lire/écrire les données du SGBDR ?
Je suis sûr que vous connaissez déjà la réponse. Oui, il ouvrira des connexions multiples et c'est pourquoi vous devriez utiliser la fonction foreachPartition
opération pour _"appliquer une fonction f
à chaque partition de cet ensemble de données." (il en va de même pour les RDD) et une sorte de pool de connexion.
Vous pouvez facilement tester cela en exécutant spark sur votre ordinateur local.
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("JDBCTest")
val sc = new SparkContext(conf)
Dans l'extrait ci-dessus, local[2] signifie deux threads. Maintenant, si vous ouvrez une connexion JDBC pendant le traitement des RDD, spark le fera pour chaque tâche.
La transformation et les actions s'exécutent en parallèle dans spark, par conception spark est plus efficace dans l'exécution de tâches en mémoire, donc en premier lieu nous devons éviter d'écrire un code qui nécessite l'ouverture d'une connexion JDBC pour chaque RDD, au lieu de cela vous pouvez le charger en mémoire pour le traitement, voir le snippet ci-dessous.
Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", mySQLConnectionURL)
.option("driver", MYSQL_DRIVER).option("dbtable", sql).option("user", userId)
.option("password", dbpassword).load();
A la vôtre !