86 votes

Pyspark : Diviser les colonnes d'un tableau multiple en rangées

J'ai un cadre de données qui comporte une ligne et plusieurs colonnes. Certaines de ces colonnes sont des valeurs individuelles, d'autres des listes. Toutes les colonnes de liste ont la même longueur. Je veux diviser chaque colonne de liste en une ligne distincte, tout en conservant telle quelle les colonnes qui ne sont pas des listes.

Exemple DF :

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# |  a|        b|        c|  d|
# +---+---------+---------+---+
# |  1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

Ce que je veux :

+---+---+----+------+
|  a|  b|  c |    d |
+---+---+----+------+
|  1|  1|  7 |  foo |
|  1|  2|  8 |  foo |
|  1|  3|  9 |  foo |
+---+---+----+------+

Si je n'avais qu'une seule colonne de liste, ce serait facile en faisant simplement un explode :

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# |  a|  b|        c|  d|
# +---+---+---------+---+
# |  1|  1|[7, 8, 9]|foo|
# |  1|  2|[7, 8, 9]|foo|
# |  1|  3|[7, 8, 9]|foo|
# +---+---+---------+---+

Cependant, si j'essaie également de explode el c je me retrouve avec un cadre de données dont la longueur est le carré de ce que je veux :

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# |  a|  b|  c|  d|
# +---+---+---+---+
# |  1|  1|  7|foo|
# |  1|  1|  8|foo|
# |  1|  1|  9|foo|
# |  1|  2|  7|foo|
# |  1|  2|  8|foo|
# |  1|  2|  9|foo|
# |  1|  3|  7|foo|
# |  1|  3|  8|foo|
# |  1|  3|  9|foo|
# +---+---+---+---+

Ce que je veux, c'est - pour chaque colonne, prendre le nième élément du tableau dans cette colonne et l'ajouter à une nouvelle ligne. J'ai essayé de mapper un explode sur toutes les colonnes du cadre de données, mais cela ne semble pas fonctionner non plus :

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()

106voto

user6910411 Points 32156

Spark >= 2.4

Vous pouvez remplacer zip_ udf con arrays_zip fonction

from pyspark.sql.functions import arrays_zip, col, explode

(df
    .withColumn("tmp", arrays_zip("b", "c"))
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark < 2,4

Avec DataFrames et UDF :

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
  lambda x, y: list(zip(x, y)),
  ArrayType(StructType([
      # Adjust types to reflect data types
      StructField("first", IntegerType()),
      StructField("second", IntegerType())
  ]))
)

(df
    .withColumn("tmp", zip_("b", "c"))
    # UDF output cannot be directly passed to explode
    .withColumn("tmp", explode("tmp"))
    .select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

Avec RDDs :

(df
    .rdd
    .flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
    .toDF(["a", "b", "c", "d"]))

Les deux solutions sont inefficaces en raison de la surcharge de communication de Python. Si la taille des données est fixe, vous pouvez faire quelque chose comme ceci :

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor 
reduce(
    DataFrame.unionAll, 
    (df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
        for i in range(n))
).toDF("a", "b", "c", "d")

ou même :

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
    struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
    for i in range(n)
]))

(df
    .withColumn("tmp", tmp)
    .select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

Cela devrait être beaucoup plus rapide qu'avec UDF ou RDD. Généralisé pour supporter un nombre arbitraire de colonnes :

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
    return explode(array(*[
        struct(*[col(c).getItem(i).alias(c) for c in colnames])
        for i in range(n)
    ]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

0 votes

Comment la solution pour Spark >= 2.4 peut-elle réellement fonctionner ? La documentation indique que l'entrée explode "doit être de type array ou map, et non pas de type string", en citant littéralement l'exception qu'elle soulève sinon. spark.apache.org/docs/latest/api/python/

0 votes

Comment gérer une liste de taille inégale dans une colonne différente ? L'exigence est de remplacer la valeur par -1 pour une liste de taille plus courte.

11voto

David Points 5064

Vous devez utiliser flatMap pas map car vous voulez créer plusieurs lignes de sortie à partir de chaque ligne d'entrée.

from pyspark.sql import Row
def dualExplode(r):
    rowDict = r.asDict()
    bList = rowDict.pop('b')
    cList = rowDict.pop('c')
    for b,c in zip(bList, cList):
        newDict = dict(rowDict)
        newDict['b'] = b
        newDict['c'] = c
        yield Row(**newDict)

df_split = sqlContext.createDataFrame(df.rdd.flatMap(dualExplode))

0 votes

Si le premier df a 3 valeurs et le second df a 2 valeurs, notre zip renvoie deux paires au lieu de 3. Pouvez-vous nous conseiller ?

0 votes

Zip associe le premier élément d'un objet avec le premier élément d'un autre objet, le deuxième avec le deuxième, etc. jusqu'à ce que l'un des objets soit à court d'éléments. Dans votre cas, après 2 valeurs. En d'autres termes, il appariera les éléments jusqu'à ce qu'il n'y ait plus d'éléments à apparier. Pour vous donner des suggestions, j'aurais besoin de savoir comment vous voulez que votre programme traite l'élément non apparié (par exemple, voulez-vous un zéro dans le deuxième ensemble ?) De plus, il n'y a qu'un seul df dans cet exemple. Si votre question est si différente de celle-ci, il est probablement préférable de poser une autre question.

1 votes

Merci @David pour votre réponse. J'ai trouvé la solution. L'utilisation d'Izip m'a aidé à résoudre ce problème. Mais j'apprécie quand même votre réponse.

6voto

Ani Menon Points 5876

Une doublure (pour Spark>=2.4.0 ) :

df.withColumn("bc", arrays_zip("b","c"))
  .select("a", explode("bc").alias("tbc"))
  .select("a", col"tbc.b", "tbc.c").show()

Importation requise :

from pyspark.sql.functions import arrays_zip


Étapes -

  1. Créez une colonne bc qui est un array_zip de colonnes b y c
  2. Explosion bc pour obtenir une structure tbc
  3. Sélectionnez les colonnes requises a , b y c (tous éclatés selon les besoins).

Sortie :

> df.withColumn("bc", arrays_zip("b","c")).select("a", explode("bc").alias("tbc")).select("a", "tbc.b", col("tbc.c")).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  7|
|  1|  2|  8|
|  1|  3|  9|
+---+---+---+

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X