33 votes

Spark prend-il en charge les analyses de colonne réelles sur les fichiers parquet dans S3?

Un des grands avantages du Parquet format de stockage de données, c'est que c'est en forme de colonne. Si j'ai un grand jeu de données avec des centaines de colonnes, mais ma requête ne touche que quelques-uns de ces cas, il est possible de lire uniquement les données qui stocke ces quelques colonnes, et ignorer le reste.

Sans doute cette fonctionnalité fonctionne par la lecture d'un bit de métadonnées à la tête du parquet de fichier qui indique les emplacements sur le système de fichiers pour chaque colonne. Le lecteur peut alors chercher sur le disque à lire dans les colonnes.

Personne ne sait si l'étincelle par défaut du parquet lecteur correctement met en œuvre ce type de sélectifs de la recherche sur le S3? Je pense que c'est pris en charge par S3, mais il y a une grande différence entre le support théorique et une mise en œuvre correctement les exploits de soutien.

18voto

Steve Loughran Points 4637

Ce doit être décomposé

  1. Le Parquet code obtenir les prédicats de spark (oui)
  2. Ne parquet puis de tenter de les lire de manière sélective uniquement les colonnes, à l'aide de l'Hadoop FileSystem seek() + read() ou readFully(position, buffer, length) des appels? Oui
  3. Le S3 connecteur de traduire ces Opérations sur les Fichiers efficientes dans les requêtes HTTP GET? Dans Amazon EMR: Oui. Dans Apache Hadoop, vous avez besoin d'hadoop 2.8 sur le chemin de classe et de définir correctement spark.hadoop.fs.s3a.experimental.fadvise=random pour déclencher d'accès aléatoire.

Hadoop 2.7 et plus tôt gérer l'agressivité seek() complètent le fichier de mal, parce qu'ils ont toujours lancer un décalage de-fin-de-fichier, être surpris par la suivante, de chercher, d'avoir à abandonner cet égard, la réouverture d'un nouveau protocole TCP/HTTPS 1.1 connexion (lent, CPU lourd), le faire à nouveau, à plusieurs reprises. Le hasard IO opération fait mal sur le chargement en vrac des choses comme .csv.gz mais est essentiel pour obtenir ORC/Parquet perf.

Vous n'obtenez pas le speedup sur Hadoop 2.7 du hadoop-aws JAR. Si vous en avez besoin, vous devez mettre à jour hadoop*.jar et dépendances, ou de créer une Étincelle de toutes pièces contre Hadoop 2.8

Notez que Hadoop 2.8+ a également une belle petite fonctionnalité: si vous appelez toString() sur un S3A système de fichiers client dans un journal d'instruction, il imprime tout le système de fichiers IO statistiques, y compris la quantité de données a été écartée en cherche, abandonnée connexions TCP &c. Vous aidera à savoir ce qui se passe.

2018-04-13 avertissement:Ne tentez pas de baisse de l'Hadoop 2.8+ hadoop-aws JAR dans le classpath avec le reste de l'hadoop-2.7 POT ensemble et s'attendre à voir tout speedup. Tout ce que vous allez voir sont des traces de pile. Vous devez mettre à jour tous les hadoop Pots et leurs dépendances transitives.

11voto

Jacek Laskowski Points 6668

AVERTISSEMENT: je n'ai pas de réponse définitive et ne voulons pas agir comme une source fiable, mais ils ont passé un peu de temps sur le parquet de soutien Spark 2.2+ et j'espère que ma réponse peut nous aider tous à se rapprocher de la bonne réponse.


Ne Parquet sur S3 éviter de tirer sur les données pour les colonnes à partir de S3 et de récupérer uniquement le fichier de morceaux dont il a besoin, ou faut-il tirer tout le fichier?

J'utilise Étincelle 2.3.0-INSTANTANÉ que j'ai construit aujourd'hui dès le master.

parquet format de source de données est gérée par ParquetFileFormat qui est un FileFormat.

Si je suis correct, la lecture de la première partie est assurée par buildReaderWithPartitionValues méthode (qui remplace l' FileFormats').

buildReaderWithPartitionValues est utilisé exclusivement en FileSourceScanExec opérateur physique est demandée pour l'entrée Rdd qui sont en fait une seule RDD pour générer de l'intérieur des lignes lorsqu' WholeStageCodegenExec est exécutée.

Avec cela dit, je pense que l'examen de ce qu' buildReaderWithPartitionValues ne peut nous rapprocher de la réponse finale.

Quand vous regardez la ligne , vous pouvez obtenir assurés que nous sommes sur la bonne voie.

// Essayez de pousser à la baisse les filtres lorsque le filtre push-bas est activé.

Que le chemin d'accès du code dépend spark.sql.parquet.filterPushdown Étincelle de la propriété qui est activé par défaut.

spark.sql.parquet.filterPushdown Permet de Parquet filtre push-down optimisation lorsque la valeur true.

Qui nous conduit à parquet-hadoop est ParquetInputFormat.setFilterPredicate iff les filtres sont définis.

if (pushed.isDefined) {
  ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}

Le code devient plus intéressant un peu plus tard, lorsque les filtres sont utilisés lorsque le code revient à parquet-m. (plutôt que d'utiliser la soi-disant vectorisé parquet décodage du lecteur). C'est la partie que je ne comprends pas vraiment (sauf ce que je peux voir dans le code).

Veuillez noter que le vectorisé parquet de décodage lecteur est contrôlé par spark.sql.parquet.enableVectorizedReader Étincelle de la propriété qui est activée par défaut.

ASTUCE: Pour savoir quelle partie de l' if expression est utilisée, activer DEBUG niveau d'enregistrement d' org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat enregistreur.

Pour voir tous les poussés vers le bas les filtres vous pourriez tourner INFO niveau d'enregistrement de l' org.apache.spark.sql.execution.FileSourceScanExec logger sur. Vous devriez voir le suivant dans les logs:

INFO Pushed Filters: [pushedDownFilters]

J'espère que si c'est pas près d'être une réponse définitive, il a aidé un peu et quelqu'un le ramasse où je l'ai laissée, pour en faire un bientôt. L'espoir meurt en dernier :)

1voto

spektom Points 11130

Non, prédicat de refoulement n'est pas entièrement pris en charge. Ceci, bien sûr, dépend de:

  • Cas d'utilisation spécifiques
  • Spark version
  • S3 connecteur type et la version

Afin de vérifier votre cas d'utilisation spécifiques, vous pouvez activer le journal de DÉBOGAGE de niveau dans une Étincelle, et d'exécuter votre requête. Ensuite, vous pouvez voir si il y a des "cherche" au cours de S3 (HTTP), des demandes ainsi que le nombre de demandes ont été effectivement envoyé. Quelque chose comme ceci:

17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]" .... 17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"

Voici l'exemple d'un rapport qui a été ouvert récemment en raison de l'impossibilité de l'Étincelle 2.1 pour calculer COUNT(*) de toutes les lignes dans un jeu de données sur la base des métadonnées stockées dans le Parquet de fichier: https://issues.apache.org/jira/browse/SPARK-21074

1voto

KrazyGautam Points 1573

parquet lecteur de spark est juste comme n'importe quel autre InputFormat,

  1. Aucun des inputFormat ont quelque chose de spécial pour S3. Les formats d'entrée peut lire LocalFileSystem , Hdfs et S3 aucune optimisation particulière fait pour ça.

  2. Parquet InpuTFormat selon les colonnes que vous demandez sélectivement lire les colonnes pour vous .

  3. Si vous voulez être mort que (bien que pousser à la baisse les prédicats travaille dans la dernière étincelle version) sélectionner manuellement les colonnes et écrire la transformation et des actions , au lieu de dépendre SQL

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X