Tl ; dr : J'apprends également à connaître la source des mesures de la taille des données. Celle-ci n'est probablement que la taille estimée de l'opération, elle pourrait ne pas refléter la taille réelle des données. Ne vous en préoccupez pas trop pour l'instant.
Version complète :
Mise à jour : je suis revenu pour corriger quelques erreurs. Je vois que la réponse précédente manquait de profondeur, je vais donc essayer de creuser un peu plus pour celle-ci (je suis encore relativement nouveau pour répondre aux questions).
Mise à jour 2 : reformulation, suppression de quelques blagues trop poussées (sry)
Ok, alors cette chose peut être très longue mais je pense que cette métrique n'est pas vraiment la taille directe des données.
Pour commencer, j'ai fait un test pour celui-ci afin de reproduire les résultats avec 200 exécuteurs et 4 cœurs :
Cela a donné ce résultat :
Maintenant je vois qu'il y a quelque chose d'intéressant, puisque la taille des données pour mon test est d'environ 1,2 Go et non de 3,2 Go, ce qui m'a conduit à lire le code source de Spark.
Quand je vais sur github, je vois que les 4 numéros de BroadcastExchange correspondent à ceci : Premier lien : BroadcastHashJoinExec : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
Taille des données correspondant à celle-ci :
J'ai trouvé que la relation val ici semble être un HashedRelationBroadcastMode.
Aller à HashedRelation https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala :
Puisque nous avons Some(Numrows) (c'est le nombre de lignes du DF). Le cas de correspondance utilise le cas un (ligne 926:927)
Revenir à la partie constructeur de HashedRelation :
Comme la jointure est pour un int haché, le type n'est pas Long => la jointure utilise UnsafeHashedRelation
À UnsafeHashedRelation :
Maintenant nous allons à l'endroit dans UnsafeHashedRelation qui détermine la taille estimée, j'ai trouvé ceci :
Concentrez-vous sur la taille estimée, notre cible est l'objet binaryMap (plus tard dans le code, assignez map = binaryMap)
Ensuite, ça va ici :
binaryMap est une BytestoBytesMap, ce qui correspond à ici https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
En sautant à la méthode getTotalMemoryConsumption (celle qui obtient estimatedSize), nous avons obtenu :
C'est l'impasse dans laquelle je me trouve actuellement. Juste mes deux centimes, je ne pense pas que ce soit un bug mais juste la taille estimée de la jointure, et comme il s'agit d'une taille estimée, je ne pense pas vraiment qu'elle doive être très précise (oui mais c'est bizarre d'être honnête dans ce cas car la différence est très grande).
Au cas où vous voudriez continuer à jouer avec le dataSize sur celui-ci. Une approche est d'impacter directement l'objet binaryMap en modifiant l'entrée de son constructeur. Regardez ça :
Il y a deux variables qui peuvent être configurées, qui sont MEMORY_OFFHEAP_ENABLED et BUFFER_PAGE size. Peut-être pouvez-vous essayer d'expérimenter avec ces deux configurations pendant spark-submit. C'est aussi la raison pour laquelle la taille de BroadcastExec ne change pas même lorsque vous changez le nombre d'exécuteurs et de cœurs.
En conclusion, je pense que la taille des données est une estimation générée par un mécanisme fascinant (j'attends également que quelqu'un de plus compétent m'explique ce mécanisme, car je suis en train de le creuser), et non pas directement la taille que vous avez mentionnée dans la première image (140 Mo). En tant que tel, il ne vaut probablement pas la peine de passer beaucoup de temps à réduire l'overhead de cette métrique particulière.
Quelques trucs liés aux bonus :
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-BroadcastExchangeExec.html
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UnsafeRow.html