Voici une solution au cas général qui ne nécessite pas de connaître la longueur du tableau à l'avance, en utilisant la méthode suivante collect
ou en utilisant udf
s. Malheureusement, cela ne fonctionne que pour spark
à partir de la version 2.1, car il nécessite l'utilisation de l'option posexplode
fonction.
Supposons que vous ayez le DataFrame suivant :
df = spark.createDataFrame(
[
[1, 'A, B, C, D'],
[2, 'E, F, G'],
[3, 'H, I'],
[4, 'J']
]
, ["num", "letters"]
)
df.show()
#+---+----------+
#|num| letters|
#+---+----------+
#| 1|A, B, C, D|
#| 2| E, F, G|
#| 3| H, I|
#| 4| J|
#+---+----------+
Divisez le letters
et ensuite utiliser posexplode
pour exploser le tableau résultant ainsi que la position dans le tableau. Utilisez ensuite pyspark.sql.functions.expr
pour saisir l'élément à l'index pos
dans ce tableau.
import pyspark.sql.functions as f
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.show()
#+---+------------+---+---+
#|num| letters|pos|val|
#+---+------------+---+---+
#| 1|[A, B, C, D]| 0| A|
#| 1|[A, B, C, D]| 1| B|
#| 1|[A, B, C, D]| 2| C|
#| 1|[A, B, C, D]| 3| D|
#| 2| [E, F, G]| 0| E|
#| 2| [E, F, G]| 1| F|
#| 2| [E, F, G]| 2| G|
#| 3| [H, I]| 0| H|
#| 3| [H, I]| 1| I|
#| 4| [J]| 0| J|
#+---+------------+---+---+
Maintenant, nous créons deux nouvelles colonnes à partir de ce résultat. La première est le nom de notre nouvelle colonne, qui sera une concaténation de letter
et l'index dans le tableau. La deuxième colonne sera la valeur à l'indice correspondant dans le tableau. Nous obtenons cette dernière en exploitant la fonctionnalité de pyspark.sql.functions.expr
ce qui nous permet utiliser les valeurs des colonnes comme paramètres .
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.show()
#+---+-------+---+
#|num| name|val|
#+---+-------+---+
#| 1|letter0| A|
#| 1|letter1| B|
#| 1|letter2| C|
#| 1|letter3| D|
#| 2|letter0| E|
#| 2|letter1| F|
#| 2|letter2| G|
#| 3|letter0| H|
#| 3|letter1| I|
#| 4|letter0| J|
#+---+-------+---+
Maintenant nous pouvons juste groupBy
le site num
et pivot
le DataFrame. En mettant tout ça ensemble, on obtient :
df.select(
"num",
f.split("letters", ", ").alias("letters"),
f.posexplode(f.split("letters", ", ")).alias("pos", "val")
)\
.drop("val")\
.select(
"num",
f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
f.expr("letters[pos]").alias("val")
)\
.groupBy("num").pivot("name").agg(f.first("val"))\
.show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#| 1| A| B| C| D|
#| 3| H| I| null| null|
#| 2| E| F| G| null|
#| 4| J| null| null| null|
#+---+-------+-------+-------+-------+