88 votes

Obtenir le nombre actuel de partitions d'un DataFrame

Existe-t-il un moyen d'obtenir le nombre actuel de partitions d'un DataFrame ? J'ai vérifié la javadoc de DataFrame (spark 1.6) et je n'ai pas trouvé de méthode pour cela, ou est-ce que je l'ai juste manqué ? (Dans le cas de JavaRDD, il y a une méthode getNumPartitions()).

174voto

user4601931 Points 1981

Vous devez appeler getNumPartitions() sur le RDD sous-jacent du DataFrame, par exemple, df.rdd.getNumPartitions() . Dans le cas de Scala, il s'agit d'une méthode sans paramètre : df.rdd.getNumPartitions .

3 votes

Moins le (), donc pas tout à fait correct - du moins pas avec le mode SCALA

3 votes

Est-ce que cela cause un conversion ( coûteux ) de DF a RDD ?

5 votes

C'est cher.

26voto

Ram Ghadiyaram Points 14932

dataframe.rdd.partitions.size est une autre alternative en dehors de df.rdd.getNumPartitions() o df.rdd.length .

Laissez-moi vous expliquer cela avec un exemple complet...

val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4

Pour prouver que le nombre de partitions que nous avons obtenu avec ci-dessus ... enregistrer ce cadre de données comme csv

numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)

Voici comment les données sont séparées sur les différentes partitions.

Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10

Mise à jour :

@Hemanth a posé une bonne question dans le commentaire ... fondamentalement pourquoi le nombre de partitions est de 4 dans le cas ci-dessus

Réponse courte : Cela dépend des cas où vous exécutez. Comme j'ai utilisé local[4], j'ai obtenu 4 partitions.

Réponse longue :

J'exécutais le programme ci-dessus dans ma machine locale et j'ai utilisé master comme local[4] en fonction de cela il prenait comme 4 les partitions.

val spark = SparkSession.builder()
    .appName(this.getClass.getName)
    .config("spark.master", "local[4]").getOrCreate()

Si c'est spark-shell dans le fil principal, le nombre de partitions est de 2.

exemple : spark-shell --master yarn et j'ai tapé les mêmes commandes à nouveau

scala> val x = (1 to 10).toList
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val numberDF = x.toDF("number")
numberDF: org.apache.spark.sql.DataFrame = [number: int]

scala> numberDF.rdd.partitions.size
res0: Int = 2
  • ici 2 est parllelisme par défaut de spark
  • Sur la base de hashpartitioner spark décidera du nombre de partitions à distribuer. si vous exécutez en --master local et en fonction de votre Runtime.getRuntime.availableProcessors() c'est-à-dire local[Runtime.getRuntime.availableProcessors()] il va essayer d'allouer Si le nombre de processeurs dont vous disposez est de 12 (c'est à dire local[Runtime.getRuntime.availableProcessors()]) et vous avez une liste de 1 à 10 alors seulement 10 partitions seront créées.

NOTA:

*_Si vous êtes sur un ordinateur portable à 12 cœurs où j'exécute le programme spark et que par défaut le nombre de partitions/tâches est le nombre de tous les cœurs disponibles, soit 12, cela signifie que signifie `local[]ous"local[${Runtime.getRuntime.availableProcessors()}]")` mais dans ce mais dans ce cas, il n'y a que 10 numéros, donc il se limitera à 10._**

En gardant tous ces points à l'esprit, je vous suggère d'essayer par vous-même

0 votes

Merci pour cette excellente réponse. Je suis curieux de savoir pourquoi une liste de 10 nombres a été divisée en 4 partitions lors de sa conversion en DF. Pouvez-vous me donner une explication, s'il vous plaît ?

0 votes

Est-ce que c'est since local[4] I used, I got 4 partitions. toujours valable pour 3.x ? J'ai 200 partitions avec local[4].

0 votes

@Sergey Bushmanov : voir ici également documents sur les étincelles

9voto

Bhargav Kosaraju Points 144

Convertir en RDD puis obtenir la longueur des partitions

DF.rdd.partitions.length

0 votes

Peut-on obtenir le numéro de partition dans la fonction map ? comme rdd.map{ r => this.partitionNum } ? ?

6voto

Achyuth Points 1421
 val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

df.rdd.getNumPartitions

0 votes

Veuillez lire ceci comment répondre à pour avoir fourni une réponse de qualité.

0 votes

Peut-on obtenir le numéro de partition dans la fonction map ? comme rdd.map{ r => this.partitionNum } ? ?

0voto

Shantanu Kher Points 873

Une autre façon intéressante d'obtenir le nombre de partitions est d'utiliser la transformation 'mapPartitions'. Exemple de code -

val x = (1 to 10).toList
val numberDF = x.toDF()
numberDF.rdd.mapPartitions(x => Iterator[Int](1)).sum()

Les experts de Spark sont invités à commenter ses performances.

0 votes

Peut-on obtenir le numéro de partition dans la fonction map ? comme rdd.map{ r => this.partitionNum } ? ?

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X