3 votes

Division d'une rangée en plusieurs rangées dans spark-shell

J'ai importé des données dans un dataframe Spark dans spark-shell. Les données sont remplies comme suit :

Col1 | Col2 | Col3 | Col4
A1   | 11   | B2   | a|b;1;0xFFFFFF
A1   | 12   | B1   | 2
A2   | 12   | B2   | 0xFFF45B

Ici, dans Col4, les valeurs sont de différentes sortes et je veux les séparer comme (supposons que "a|b" est un type d'alphabet, "1 ou 2" est un type de chiffre et "0xFFFFFF ou 0xFFF45B" est un type de numéro hexadécimal) :

Donc, la sortie devrait être :

Col1 | Col2 | Col3 | alphabets | digits | hexadecimal
A1   | 11   | B2   | a         | 1      | 0xFFFFFF
A1   | 11   | B2   | b         | 1      | 0xFFFFFF
A1   | 12   | B1   |           | 2      | 
A2   | 12   | B2   |           |        | 0xFFF45B

J'espère que ma requête est claire pour vous et j'utilise spark-shell. Merci d'avance.

2voto

Wilmerton Points 987

Modifier après avoir obtenu ceci réponse sur la façon de faire une référence arrière dans regexp_replace .

Vous pouvez utiliser regexp_replace avec une référence arrière, alors split deux fois et explode . C'est, imo, plus propre que ma solution originale.

val df = List(
    ("A1"   , "11"   , "B2"   , "a|b;1;0xFFFFFF"),
    ("A1"   , "12"   , "B1"   , "2"),
    ("A2"   , "12"   , "B2"   , "0xFFF45B")
  ).toDF("Col1" , "Col2" , "Col3" , "Col4")

val regExStr = "^([A-z|]+)?;?(\\d+)?;?(0x.*)?$"
val res = df
  .withColumn("backrefReplace",
       split(regexp_replace('Col4,regExStr,"$1;$2;$3"),";"))
  .select('Col1,'Col2,'Col3,
       explode(split('backrefReplace(0),"\\|")).as("letter"),
       'backrefReplace(1)                      .as("digits"),
       'backrefReplace(2)                      .as("hexadecimal")
  )

+----+----+----+------+------+-----------+
|Col1|Col2|Col3|letter|digits|hexadecimal|
+----+----+----+------+------+-----------+
|  A1|  11|  B2|     a|     1|   0xFFFFFF|
|  A1|  11|  B2|     b|     1|   0xFFFFFF|
|  A1|  12|  B1|      |     2|           |
|  A2|  12|  B2|      |      |   0xFFF45B|
+----+----+----+------+------+-----------+

vous devez toujours remplacer les chaînes vides par null cependant...


Réponse précédente (quelqu'un pourrait encore la préférer) :

Voici une solution qui s'en tient aux DataFrames mais qui est aussi assez désordonnée. Vous pouvez d'abord utiliser regexp_extract trois fois (est-il possible d'en faire moins avec la référence arrière ?), et enfin split sur "|" et explode . Notez que vous avez besoin d'une coalesce pour explode pour tout renvoyer (vous pouvez toujours modifier les chaînes vides dans le fichier letter a null dans cette solution).

val res = df
  .withColumn("alphabets",  regexp_extract('Col4,"(^[A-z|]+)?",1))
  .withColumn("digits",     regexp_extract('Col4,"^([A-z|]+)?;?(\\d+)?;?(0x.*)?$",2))
  .withColumn("hexadecimal",regexp_extract('Col4,"^([A-z|]+)?;?(\\d+)?;?(0x.*)?$",3))
  .withColumn("letter",
     explode(
       split(
         coalesce('alphabets,lit("")),
         "\\|"
       )
     )
   )

res.show    

+----+----+----+--------------+---------+------+-----------+------+
|Col1|Col2|Col3|          Col4|alphabets|digits|hexadecimal|letter|
+----+----+----+--------------+---------+------+-----------+------+
|  A1|  11|  B2|a|b;1;0xFFFFFF|      a|b|     1|   0xFFFFFF|     a|
|  A1|  11|  B2|a|b;1;0xFFFFFF|      a|b|     1|   0xFFFFFF|     b|
|  A1|  12|  B1|             2|     null|     2|       null|      |
|  A2|  12|  B2|      0xFFF45B|     null|  null|   0xFFF45B|      |
+----+----+----+--------------+---------+------+-----------+------+

Note : La partie regexp pourrait être bien meilleure avec une référence arrière, donc si quelqu'un sait comment le faire, merci de commenter !

0voto

Tzach Zohar Points 6701

Je ne suis pas sûr que cela soit faisable tout en restant à 100% avec les Dataframes, voici une solution (un peu désordonnée ?) utilisant les RDDs pour la division elle-même :

import org.apache.spark.sql.functions._
import sqlContext.implicits._

// we switch to RDD to perform the split of Col4 into 3 columns
val rddWithSplitCol4 = input.rdd.map { r =>
  val indexToValue = r.getAs[String]("Col4").split(';').map {
    case s if s.startsWith("0x") => 2 -> s
    case s if s.matches("\\d+") => 1 -> s
    case s => 0 -> s
  }
  val newCols: Array[String] = indexToValue.foldLeft(Array.fill[String](3)("")) {
    case (arr, (index, value)) => arr.updated(index, value)
  }
  (r.getAs[String]("Col1"), r.getAs[Int]("Col2"), r.getAs[String]("Col3"), newCols(0), newCols(1), newCols(2))
}

// switch back to Dataframe and explode alphabets column
val result = rddWithSplitCol4
  .toDF("Col1", "Col2", "Col3", "alphabets", "digits", "hexadecimal")
  .withColumn("alphabets", explode(split(col("alphabets"), "\\|")))

result.show(truncate = false)
// +----+----+----+---------+------+-----------+
// |Col1|Col2|Col3|alphabets|digits|hexadecimal|
// +----+----+----+---------+------+-----------+
// |A1  |11  |B2  |a        |1     |0xFFFFFF   |
// |A1  |11  |B2  |b        |1     |0xFFFFFF   |
// |A1  |12  |B1  |         |2     |           |
// |A2  |12  |B2  |         |      |0xFFF45B   |
// +----+----+----+---------+------+-----------+

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X