4 votes

Découverte et lecture de plusieurs fichiers dans Spark

Avoir des systèmes différents qui ont différents ensembles de fichiers (txt, csv) à charger, transformer et écrire dans des fichiers de sortie en utilisant Apache Spark/Scala. Par exemple, le système A a 3 fichiers et le système B a 2 fichiers dans leurs répertoires respectifs.

Type de fichier | Noms de fichiers
-----------------------------------------
Client          | Client_20190301.csv
Compte          | Compte_20190301.csv
Commande        | Commande_20190301.csv
Détails de commande | DétailsCommande_20190301.txt
Transactions    | Transactions_20190301.txt

Maintenant, je voudrais obtenir les noms de fichiers et les chemins en fonction du nom du système donné en entrée afin de pouvoir charger leurs fichiers système respectifs. Je ne veux pas créer de programmes séparés pour chaque système et charger leurs fichiers car les noms de fichiers ou les chemins pourraient changer à l'avenir.

Y a-t-il un moyen efficace de gérer cela? Utiliser des fichiers de configuration? Ou peut-être utiliser ou ne pas utiliser de bibliothèques externes? Veuillez me guider.

2voto

Sim Points 1884

Ce problème est un bon candidat pour une approche de diviser et conquérir :

  1. Décrivez le nombre de systèmes + les paramètres nécessaires pour paramétrer un traitement ultérieur. La manière de le faire dépend de vos environnements de déploiement, du langage de programmation choisi, etc. Il n'y a pas de réponse unique.

  2. Lisez les informations de (1) dans une structure de données.

  3. Générez une liste de fichiers à traiter en utilisant une combinaison de (2) et peut-être une liste de répertoires (récursive). Notez que, étant donné un chemin, vous pouvez obtenir un système de fichiers Hadoop dans Spark en utilisant FileSystem.get(new java.net.URI(path), new Configuration()).

  4. Groupez les fichiers par type.

  5. Pour chaque groupe, paramétrez un DataFrameReader à partir de spark.read de manière appropriée et appelez la version de chargement multiple en utilisant .load(paths: _*). Vous pouvez généraliser ce code en créant une map du nom du groupe à une fonction qui renvoie un DataFrameReader.

Voici un exemple de comment faire (5) :

val readers: Map[String, SparkSession => DataFrameReader] = Map(
  "client" -> ((spark: SparkSession) => spark.read.option("format", "csv"))
)

val groupes: Map[String, Seq[String]] = ???

groupes.map { case (nomGroupe, chemins) =>
  lecteurs(nomGroupe)(spark).load(chemins: _*)
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X