2 votes

Partitionner un Dataframe Spark sur la base d'une colonne spécifique et vider le contenu de chaque partition dans un csv.

J'utilise les API Java de spark 1.6.2 pour charger des données dans un Dataframe DF1 qui ressemble à ceci :

Key  Value
A    v1
A    v2
B    v3
A    v4

Maintenant, j'ai besoin de partitionner DF1 sur la base d'un sous-ensemble de valeurs dans la colonne "Key" et de vider chaque partition dans un fichier csv (en utilisant spark-csv).

Sortie souhaitée :

A.csv

Key Value
A   v1
A   v2
A   v4

B.csv

Key Value
B   v3

Pour l'instant, je construis un HashMap (myList) contenant le sous-ensemble de valeurs que je dois filtrer, puis j'effectue une itération en filtrant une clé différente à chaque itération. Avec le code suivant, j'obtiens ce que je veux, mais je me demande s'il existe un moyen plus efficace de le faire :

DF1 = <some operations>.cache();

for (Object filterKey: myList.keySet()) {
  DF2 = DF1.filter((String)myList.get(filterKey));

  DF2.write().format.format("com.databricks.spark.csv")
            .option("header", "true")
      .save("/" + filterKey + ".csv");
}

2voto

Vous y êtes presque, il ne vous reste plus qu'à ajouter l'élément partitionBy qui partitionnera les fichiers comme vous le souhaitez.

DF1
  .filter{case(key, value) => myList.contains(key))
  .write
  .partitionBy("key")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/my/basepath/")

Les fichiers seront maintenant stockés sous "/my/basepath/key=A/", "/my/basepath/key=B/", etc.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X