Dans l'extrait suivant, l'élément tryParquet
essaie de charger un ensemble de données à partir d'un fichier Parquet s'il existe. Si ce n'est pas le cas, elle calcule, conserve et renvoie le plan du jeu de données qui a été fourni :
import scala.util.{Try, Success, Failure}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
sealed trait CustomRow
case class MyRow(
id: Int,
name: String
) extends CustomRow
val ds: Dataset[MyRow] =
Seq((1, "foo"),
(2, "bar"),
(3, "baz")).toDF("id", "name").as[MyRow]
def tryParquet[T <: CustomRow](session: SparkSession, path: String, target: Dataset[T]): Dataset[T] =
Try(session.read.parquet(path)) match {
case Success(df) => df.as[T] // <---- compile error here
case Failure(_) => {
target.write.parquet(path)
target
}
}
val readyDS: Dataset[MyRow] =
tryParquet(spark, "/path/to/file.parq", ds)
Cependant, cela produit une erreur de compilation sur df.as[T]
:
Impossible de trouver l'encodeur pour le type stocké dans un ensemble de données. Les types primitifs (Int, String, etc.) et les types de produits (classes de cas) sont pris en charge par l'importation de spark.implicits._.
La prise en charge de la sérialisation d'autres types sera ajoutée dans les prochaines versions.
case Success(df) => df.as[T]
On peut contourner ce problème en faisant tryParquet
moulage df
pour retourner un DataFrame
et laisser l'appelant faire un cast vers le constructeur désiré. Cependant, existe-t-il une solution dans le cas où nous voulons que le type soit géré en interne par la fonction ?