90 votes

Diviser une colonne de chaîne de données Spark Dataframe en plusieurs colonnes

J'ai vu plusieurs personnes suggérer que Dataframe.explode est une façon utile de le faire, mais il en résulte plus de lignes que le dataframe original, ce qui n'est pas du tout ce que je veux. Je veux simplement faire l'équivalent Dataframe du très simple :

rdd.map(lambda row: row + [row.my_str_col.split('-')])

qui prend quelque chose qui ressemble à :

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

et le convertit en ceci :

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Je suis conscient de pyspark.sql.functions.split() mais cela donne une colonne de type tableau imbriqué au lieu de deux colonnes de niveau supérieur comme je le souhaite.

Idéalement, je souhaite que ces nouvelles colonnes soient également nommées.

145voto

Peter Gaultney Points 78

pyspark.sql.functions.split() est la bonne approche ici - vous devez simplement aplatir la colonne imbriquée ArrayType en plusieurs colonnes de niveau supérieur. Dans ce cas, où chaque tableau ne contient que 2 éléments, c'est très facile. Il suffit d'utiliser Column.getItem() pour récupérer chaque partie du tableau comme une colonne à part entière :

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))

Le résultat sera :

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Je ne sais pas comment résoudre ce problème dans un cas général où les tableaux imbriqués n'ont pas la même taille d'une rangée à l'autre.

2 votes

Existe-t-il un moyen de placer les éléments restants dans une seule colonne ? split_col.getItem(2 - n) dans une troisième colonne. Je suppose que quelque chose comme la boucle ci-dessus pour créer des colonnes pour tous les éléments puis les concaténer pourrait fonctionner, mais je ne sais pas si c'est très efficace ou non.

0 votes

Utilisez df.withColumn('NAME_remaining', pyspark.sql.functions.split(df[my_str_col'],'-',3).getItem(2) pour obtenir les éléments restants. spark.apache.org/docs/latest/api/sql/index.html

0 votes

J'ai découvert que si vous essayez de réaffecter l'un des éléments fractionnés à la colonne d'origine, vous devez renommer la colonne d'origine avec withColumnRenamed() avant le fractionnement afin d'éviter une erreur apparemment liée à issues.apache.org/jira/browse/SPARK-14948 .

44voto

pault Points 12252

Voici une solution au cas général qui ne nécessite pas de connaître la longueur du tableau à l'avance, en utilisant la méthode suivante collect ou en utilisant udf s. Malheureusement, cela ne fonctionne que pour spark à partir de la version 2.1, car il nécessite l'utilisation de l'option posexplode fonction.

Supposons que vous ayez le DataFrame suivant :

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

Divisez le letters et ensuite utiliser posexplode pour exploser le tableau résultant ainsi que la position dans le tableau. Utilisez ensuite pyspark.sql.functions.expr pour saisir l'élément à l'index pos dans ce tableau.

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

Maintenant, nous créons deux nouvelles colonnes à partir de ce résultat. La première est le nom de notre nouvelle colonne, qui sera une concaténation de letter et l'index dans le tableau. La deuxième colonne sera la valeur à l'indice correspondant dans le tableau. Nous obtenons cette dernière en exploitant la fonctionnalité de pyspark.sql.functions.expr ce qui nous permet utiliser les valeurs des colonnes comme paramètres .

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

Maintenant nous pouvons juste groupBy le site num et pivot le DataFrame. En mettant tout ça ensemble, on obtient :

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+

15voto

soatz Points 123

Voici une autre approche, au cas où vous voudriez diviser une chaîne avec un délimiteur.

import pyspark.sql.functions as f

df = spark.createDataFrame([("1:a:2001",),("2:b:2002",),("3:c:2003",)],["value"])
df.show()
+--------+
|   value|
+--------+
|1:a:2001|
|2:b:2002|
|3:c:2003|
+--------+

df_split = df.select(f.split(df.value,":")).rdd.flatMap(
              lambda x: x).toDF(schema=["col1","col2","col3"])

df_split.show()
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   a|2001|
|   2|   b|2002|
|   3|   c|2003|
+----+----+----+

Je ne pense pas que ce va-et-vient vers les RDDs va vous ralentir... Ne vous inquiétez pas non plus de la dernière spécification de schéma : elle est facultative, vous pouvez l'éviter en généralisant la solution aux données dont la taille des colonnes est inconnue.

0 votes

Comment puis-je faire cela en scala ? Je suis coincé avec la fonction lambda flatMap.

1 votes

Attention le modèle est donné comme une expression régulière, donc vous devez utiliser \ pour les caractères spéciaux

1 votes

Si vous ne voulez pas vous référer à df à l'intérieur de votre expression, vous pouvez passer le nom de la colonne à split c'est-à-dire df.select(f.split("value",":"))...

0voto

Jasminyas Points 1

J'ai trouvé une solution pour le cas inégal général (ou lorsque vous obtenez les colonnes imbriquées, obtenues avec la fonction .split()) :

import pyspark.sql.functions as f

@f.udf(StructType([StructField(col_3, StringType(), True),
                   StructField(col_4, StringType(), True)]))

 def splitCols(array):
    return array[0],  ''.join(array[1:len(array)])

 df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
        .select(df.columns+['name.*'])

En fait, il vous suffit de sélectionner toutes les colonnes précédentes + les colonnes imbriquées 'nom_colonne.*' et vous les obtiendrez comme deux colonnes de premier niveau dans ce cas.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X