18 votes

Pourquoi la taille des données d'échange de diffusion Spark est-elle plus grande que la taille brute lors de la jointure ?

Je fais une jointure de diffusion de deux tables A et B. B est une table en cache créée avec le SQL Spark suivant :

create table B as select segment_ids_hash from  stb_ranker.c3po_segments
      where
        from_unixtime(unix_timestamp(string(dayid), 'yyyyMMdd')) >= CAST('2019-07-31 00:00:00.000000000' AS TIMESTAMP)
      and
        segmentid_check('(6|8|10|12|14|371|372|373|374|375|376|582|583|585|586|587|589|591|592|594|596|597|599|601|602|604|606|607|609|610|611|613|615|616)', seg_ids) = true
cache table B

La colonne 'segment_ids_hash' est de type entier et le résultat contient 36,4 millions d'enregistrements. La taille de la table en cache est d'environ 140 Mo, comme indiqué ci-dessous enter image description here

Puis j'ai fait le joint comme suit :

select count(*) from A broadcast join B on A.segment_ids_hash = B.segment_ids_hash

enter image description here

Ici, la taille des données d'échange de diffusion est d'environ 3,2 Go.

Ma question est de savoir pourquoi la taille des données d'échange de diffusion (3,2 Go) est tellement plus importante que celle des données brutes (~140 Mo). Quels sont les frais généraux ? Existe-t-il un moyen de réduire la taille des données d'échange de diffusion ?

Gracias

6voto

Long Vu Points 391

Tl ; dr : J'apprends également à connaître la source des mesures de la taille des données. Celle-ci n'est probablement que la taille estimée de l'opération, elle pourrait ne pas refléter la taille réelle des données. Ne vous en préoccupez pas trop pour l'instant.

Version complète :

Mise à jour : je suis revenu pour corriger quelques erreurs. Je vois que la réponse précédente manquait de profondeur, je vais donc essayer de creuser un peu plus pour celle-ci (je suis encore relativement nouveau pour répondre aux questions).

Mise à jour 2 : reformulation, suppression de quelques blagues trop poussées (sry)

Ok, alors cette chose peut être très longue mais je pense que cette métrique n'est pas vraiment la taille directe des données.

Pour commencer, j'ai fait un test pour celui-ci afin de reproduire les résultats avec 200 exécuteurs et 4 cœurs :

enter image description here

Cela a donné ce résultat : enter image description here

Maintenant je vois qu'il y a quelque chose d'intéressant, puisque la taille des données pour mon test est d'environ 1,2 Go et non de 3,2 Go, ce qui m'a conduit à lire le code source de Spark.

Quand je vais sur github, je vois que les 4 numéros de BroadcastExchange correspondent à ceci : Premier lien : BroadcastHashJoinExec : https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala enter image description here

Taille des données correspondant à celle-ci :

enter image description here J'ai trouvé que la relation val ici semble être un HashedRelationBroadcastMode.

Aller à HashedRelation https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala : enter image description here

Puisque nous avons Some(Numrows) (c'est le nombre de lignes du DF). Le cas de correspondance utilise le cas un (ligne 926:927)

Revenir à la partie constructeur de HashedRelation : enter image description here

Comme la jointure est pour un int haché, le type n'est pas Long => la jointure utilise UnsafeHashedRelation

À UnsafeHashedRelation :

enter image description here

Maintenant nous allons à l'endroit dans UnsafeHashedRelation qui détermine la taille estimée, j'ai trouvé ceci :

enter image description here

Concentrez-vous sur la taille estimée, notre cible est l'objet binaryMap (plus tard dans le code, assignez map = binaryMap)

Ensuite, ça va ici : enter image description here

binaryMap est une BytestoBytesMap, ce qui correspond à ici https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java

En sautant à la méthode getTotalMemoryConsumption (celle qui obtient estimatedSize), nous avons obtenu :

enter image description here

enter image description here

C'est l'impasse dans laquelle je me trouve actuellement. Juste mes deux centimes, je ne pense pas que ce soit un bug mais juste la taille estimée de la jointure, et comme il s'agit d'une taille estimée, je ne pense pas vraiment qu'elle doive être très précise (oui mais c'est bizarre d'être honnête dans ce cas car la différence est très grande).

Au cas où vous voudriez continuer à jouer avec le dataSize sur celui-ci. Une approche est d'impacter directement l'objet binaryMap en modifiant l'entrée de son constructeur. Regardez ça :

enter image description here

Il y a deux variables qui peuvent être configurées, qui sont MEMORY_OFFHEAP_ENABLED et BUFFER_PAGE size. Peut-être pouvez-vous essayer d'expérimenter avec ces deux configurations pendant spark-submit. C'est aussi la raison pour laquelle la taille de BroadcastExec ne change pas même lorsque vous changez le nombre d'exécuteurs et de cœurs.

En conclusion, je pense que la taille des données est une estimation générée par un mécanisme fascinant (j'attends également que quelqu'un de plus compétent m'explique ce mécanisme, car je suis en train de le creuser), et non pas directement la taille que vous avez mentionnée dans la première image (140 Mo). En tant que tel, il ne vaut probablement pas la peine de passer beaucoup de temps à réduire l'overhead de cette métrique particulière.

Quelques trucs liés aux bonus :

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkPlan-BroadcastExchangeExec.html

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-UnsafeRow.html

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X