2 votes

Comparaison des valeurs des colonnes dans le cadre de données Spark

J'ai un cadre de données qui contient un grand nombre d'enregistrements. Dans ce DF, un enregistrement peut être répété plusieurs fois et chaque fois qu'il est mis à jour, le champ de la dernière mise à jour contient la date à laquelle l'enregistrement a été modifié.

Nous avons un groupe de colonnes sur lesquelles nous voulons comparer les lignes d'identifiants similaires. Pendant cette comparaison, nous voulons capturer quels sont les champs/colonnes qui ont changé entre l'enregistrement précédent et l'enregistrement actuel et capturer cela dans une colonne "colonnes_mises_à_jour" de l'enregistrement mis à jour. Comparez ce deuxième enregistrement au troisième et identifiez les colonnes mises à jour et capturez-les dans le champ "colonnes_mises à jour" du troisième enregistrement, continuez de la même manière jusqu'au dernier enregistrement de cet identifiant et faites la même chose pour chaque identifiant qui a plus d'une entrée.

Initialement, nous avons regroupé les colonnes et créé un hash de ce groupe de colonnes et comparé avec les valeurs de hash de la ligne suivante, de cette façon, il m'aide à identifier les enregistrements qui ont des mises à jour, mais je veux les colonnes qui ont été mises à jour.

Ici, je partage quelques données, qui sont le résultat attendu et c'est comment les données finales devraient ressembler après l'ajout de colonnes mises à jour (ici, je peux dire, utiliser les colonnes Col1, Col2, Col3, Col4 et Col5 pour la comparaison entre deux lignes) :

enter image description here

Je veux faire cela de manière efficace. Quelqu'un a-t-il essayé quelque chose comme ça ?

Je cherche de l'aide !

~Krish.

2voto

werner Points 1341

A fenêtre peut être utilisé.

L'idée est de regrouper les données par ID et le trier par DERNIÈRE MISE À JOUR , copiez les valeurs de la ligne précédente (si elle existe) dans la ligne actuelle, puis comparez les données copiées avec les valeurs actuelles.

val data = ... //the dataframe has the columns ID,Col1,Col2,Col3,Col4,Col5,LAST_UPDATED,IS_DELETED

val fieldNames = data.schema.fieldNames.dropRight(1) //1
val columns = fieldNames.map(f => col(f))
val windowspec = Window.partitionBy("ID").orderBy("LAST_UPDATED") //2
def compareArrayUdf() = ... //3

val result = data
  .withColumn("cur", array(columns: _*)) //4
  .withColumn("prev", lag($"cur", 1).over(windowspec)) //5
  .withColumn("updated_columns", compareArrayUdf()($"cur", $"prev")) //6
  .drop("cur", "prev") //7
  .orderBy("LAST_UPDATED")

Remarques :

  1. créer une liste de tous les champs à comparer. Tous les champs sauf le dernier ( DERNIÈRE MISE À JOUR ) sont utilisés
  2. créer une fenêtre qui est divisée par ID et chaque partition est triée par DERNIÈRE MISE À JOUR
  3. créer un udf qui compare deux tableaux et fait correspondre les différences découvertes aux noms des champs, code voir ci-dessous
  4. créer une nouvelle colonne qui contient toutes les valeurs qui doivent être comparées
  5. créer une nouvelle colonne qui contient toutes les valeurs de l'élément précédent (en utilisant l'option décalage -fonction) qui doivent être comparées. La ligne précédente est la ligne avec la même ID et le plus grand DERNIÈRE MISE À JOUR qui est plus petit que celui qui est en cours. Ce champ peut être nul
  6. compare les deux nouvelles colonnes et place le résultat dans le tableau suivant colonnes mises à jour
  7. déposer les deux colonnes intermédiaires créées aux étapes 3 et 4

El compareArraysUdf es

def compareArray(cur: mutable.WrappedArray[String], prev: mutable.WrappedArray[String]): String = {
  if (prev == null || cur == null) return ""
  val res = new StringBuilder
  for (i <- cur.indices) {
    if (!cur(i).contentEquals(prev(i))) {
      if (res.nonEmpty) res.append(",")
      res.append(fieldNames(i))
    }
  }
  res.toString()
}
def compareArrayUdf() = udf[String, mutable.WrappedArray[String], mutable.WrappedArray[String]](compareArray)

0voto

Simon Points 4883

Vous pouvez joindre votre DataFrame ou DataSet à lui-même, en joignant les lignes dont l'identifiant est le même dans les deux lignes et dont la version de la ligne de gauche est i et la version de la ligne de droite est i+1 . Voici un exemple

case class T(id: String, version: Int, data: String)

val data = Seq(T("1", 1, "d1-1"), T("1", 2, "d1-2"), T("2", 1, "d2-1"), T("2", 2, "d2-2"), T("2", 3, "d2-3"), T("3", 1, "d3-1"))
data: Seq[T] = List(T(1,1,d1-1), T(1,2,d1-2), T(2,1,d2-1), T(2,2,d2-2), T(2,3,d2-3), T(3,1,d3-1))

val ds = data.toDS

val joined = ds.as("ds1").join(ds.as("ds2"), $"ds1.id" === $"ds2.id" && (($"ds1.version"+1) === $"ds2.version"))

Ensuite, vous pouvez référencer les colonnes dans le nouveau DataFrame/DataSet comme suit $"ds1.data y $"ds2.data etc.

Pour trouver les lignes où les données ont changé d'une version à l'autre, vous pouvez effectuer les opérations suivantes

joined.filter($"ds1.data" !== $"ds2.data")

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X