155 votes

Comment stocker des objets personnalisés dans Dataset?

Selon l'Introduction de l'Étincelle ensembles de données:

Comme nous nous réjouissons de Spark 2.0, nous avons l'intention qui est passionnant avec des améliorations à des ensembles de données, en particulier: ... Personnalisé codeurs – alors que nous avons actuellement de générer automatiquement les encodeurs pour une grande variété de types, nous aimerions ouvrir une API pour les objets personnalisés.

et les tentatives de magasin de type personnalisé en Dataset de plomb à la suite d'erreur comme:

Impossible de trouver le codeur de type stockées dans une base de données. Les types primitifs (Int, String, etc) et types de Produit (cas des classes) sont pris en charge par l'importation de sqlContext.implicites._ Soutien pour la sérialisation d'autres types seront ajoutés dans les prochaines versions

ou:

Java.lang.UnsupportedOperationException: n'a Pas d'Encodeur trouvé ....

Existe-t-il des solutions de contournement?


Remarque cette question n'existe que comme un point d'entrée pour une Communauté Wiki réponse. Hésitez pas à actualiser et d'améliorer à la fois la question et la réponse.

250voto

Alec Points 23780

Mise à jour

Cette réponse est toujours valable et instructif, même si les choses sont maintenant mieux depuis la 2.2/2.3, qui ajoute intégré dans le support codeur pour Set, Seq, Map, Date, Timestamp, et BigDecimal. Si vous vous en tenez à faire des types avec uniquement des classes de cas et de la Scala types, vous devez être bien avec juste l'implicite en SQLImplicits.


Malheureusement, pratiquement rien n'a été ajouté pour aider à cela. Recherche d' @since 2.0.0 en Encoders.scala ou SQLImplicits.scala trouve les choses surtout à voir avec les types primitifs (et quelques réglages de classes de cas). Donc, première chose à dire: il n'y a actuellement pas de véritable bonne prise en charge personnalisée de la classe des encodeurs. Avec cela de la route, ce qui suit est quelques trucs qui ne aussi bon travail que nous pouvons espérer, compte tenu de ce que nous avons actuellement à notre disposition. Comme un premier avertissement: cela ne fonctionne pas parfaitement, et je vais faire de mon mieux pour rendre toutes les limitations clairs et directs.

Quel est le problème exactement

Lorsque vous voulez faire un jeu de données, la Spark "exige un codeur (pour convertir une JVM objet de type T et de l'intérieur Spark SQL représentation) qui est généralement créé automatiquement par le biais implicites à partir d'un SparkSession, ou peut être créé explicitement en appelant des méthodes statiques sur Encoders" (extrait de la docs sur createDataset). Un codeur prendra la forme Encoder[T]T est le type d'encodage. La première suggestion est d'ajouter import spark.implicits._ (ce qui vous donne ces implicite encodeurs) et la deuxième suggestion est de transmettre explicitement dans l'implicite du codeur à l'aide de cet ensemble de codeur fonctions connexes.

Il n'y a pas de codeur disponible pour des cours réguliers, de sorte

import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

vous donnera la implicites suivantes liées erreur de compilation:

Impossible de trouver le codeur de type stockées dans une base de données. Les types primitifs (Int, String, etc) et types de Produit (cas des classes) sont pris en charge par l'importation de sqlContext.implicites._ Soutien pour la sérialisation d'autres types seront ajoutés dans les prochaines versions

Toutefois, si vous enroulez quel que soit le type que vous venez d'utiliser pour obtenir l'erreur ci-dessus dans certaines classe qui étend la classe Product, l'erreur de prêter à confusion, de retard de l'exécution, de sorte

import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))

Compile très bien, mais échoue lors de l'exécution avec

java.lang.UnsupportedOperationException: n'a Pas d'Encodeur trouvé pour MyObj

La raison pour cela est que les codeurs Étincelle crée avec les implicites sont en réalité qu'un fait au moment de l'exécution (via scala relfection). Dans ce cas, tous Étincelle vérifie au moment de la compilation, c'est que l'extérieur de la classe s'étend Product (toutes les classes), et seulement réalise au moment de l'exécution qu'il ne sait toujours pas quoi faire avec MyObj (le même problème se produit si j'ai essayé de faire un Dataset[(Int,MyObj)] - Spark attend jusqu'à ce que l'exécution pour barf sur MyObj). Ce sont des problèmes centraux qui sont dans le besoin d'être corrigé:

  • certaines classes qui étendent Product de la compilation en dépit de toujours s'écraser au moment de l'exécution et
  • il n'y a aucun moyen de passer en custom encodeurs pour les types imbriqués (je n'ai aucun moyen de l'alimentation de l'Étincelle d'un codeur pour seulement MyObj telle qu'elle sait alors comment coder Wrap[MyObj] ou (Int,MyObj)).

Suffit d'utiliser kryo

La solution tout le monde suggère est d'utiliser l' kryo de l'encodeur.

import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))

Cela devient assez fastidieux rapide. Surtout si votre code est de manipuler toutes sortes de jeux de données, l'assemblage, le groupement des etc. Vous vous retrouvez en accumulant un tas de extra implicites. Alors, pourquoi ne pas simplement faire un implicite que tout cela est-il automatiquement?

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
  org.apache.spark.sql.Encoders.kryo[A](ct)

Et maintenant, il semble que je peux faire presque tout ce que je veux (l'exemple ci-dessous ne fonctionne pas dans l' spark-shellspark.implicits._ est automatiquement importé)

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!

Ou presque. Le problème est que l'utilisation d' kryo conduit à Étincelle juste le stockage de chaque ligne dans le jeu de données comme un plat d'objet binaire. Pour map, filter, foreach c'est suffisant, mais pour des opérations comme join, la Spark a vraiment besoin de ces pour être répartis en colonnes. L'inspection du schéma d' d2 ou d3, que vous voyez là est juste une colonne binaire:

d2.printSchema
// root
//  |-- value: binary (nullable = true)

Solution partielle pour les n-uplets

Donc, en utilisant la magie des implicites dans Scala (de plus en 6.26.3 la Surcharge de Résolution), je peux me faire une série d'implicites qui va le faire aussi bien que possible, au moins pour les tuples, et va bien travailler avec l'existant implicites:

import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._  // we can still take advantage of all the old implicits

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)

implicit def tuple2[A1, A2](
  implicit e1: Encoder[A1],
           e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)

implicit def tuple3[A1, A2, A3](
  implicit e1: Encoder[A1],
           e2: Encoder[A2],
           e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)

// ... you can keep making these

Puis, armés de ces implicites, je peux faire mon exemple ci-dessus, bien qu'avec une certaine colonne de renommage

class MyObj(val i: Int)

val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")

Je n'ai pas encore compris comment faire pour obtenir l'attend tuple noms (_1, _2, ...) par défaut sans les renommer - si quelqu'un veut jouer avec cela, c' est là que le nom de l' "value" se présente et c' est là que le tuple noms sont généralement ajoutés. Toutefois, le point essentiel est que j'ai maintenant une belle structuré schéma:

d4.printSchema
// root
//  |-- _1: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)
//  |-- _2: struct (nullable = false)
//  |    |-- _1: integer (nullable = true)
//  |    |-- _2: binary (nullable = true)

Donc, en résumé, cette solution de contournement:

  • nous permet d'obtenir des colonnes distinctes pour les n-uplets (si nous pouvons nous rejoindre sur les tuples de nouveau, yay!)
  • on peut de nouveau compter sur le implicites (donc pas besoin de passer en kryo partout)
  • est presque entièrement rétro-compatible avec import spark.implicits._ (avec quelques renommer impliqués)
  • ne pas les laisser nous rejoindre sur l' kyro sérialisé binaire colonnes, a fortiori sur les champs de ceux qui peuvent avoir
  • a la désagréable effet de bord de la renommant certains de le tuple les colonnes "valeur" (si nécessaire, cela peut être annulée par la conversion de .toDF, de la spécification de nouveaux noms de colonnes, et de le convertir en un ensemble de données et les noms de schéma semblent être conservée à travers les jointures, où ils sont le plus nécessaires).

Solution partielle pour les classes en général

Celui-ci est moins agréable et n'a pas de bonne solution. Cependant, maintenant que nous avons le tuple solution ci-dessus, j'ai un pressentiment de la conversion implicite de la solution d'une autre réponse sera un peu moins douloureux aussi, puisque vous pouvez convertir votre plus complexe classes de n-uplets. Puis, après la création du jeu de données, vous auriez probablement renommer les colonnes à l'aide de la dataframe approche. Si tout va bien, c'est vraiment une amélioration, car je peux maintenant effectuer des jointures sur les champs de mes classes. Si j'avais juste utilisé un plat binaires kryo sérialiseur qui n'aurait pas été possible.

Voici un exemple qui fait un peu de tout: j'ai une classe MyObj qui a des champs de types Int, java.util.UUID, et Set[String]. Le premier prend soin de lui-même. La seconde, bien que je ne pouvais sérialiser l'aide d' kryo serait plus utile s'il est stocké en tant que String (depuis UUIDs sont généralement quelque chose que je souhaite rejoindre contre). La troisième vraiment seulement le fait dans une colonne binaire.

class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])

// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])

// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
  new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)

Maintenant, je peux créer un jeu de données avec un joli schéma en utilisant cette machine:

val d = spark.createDataset(Seq[MyObjEncoded](
  new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
  new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]

Et le schéma montre moi j'ai des colonnes avec les noms corrects et avec la première, les deux choses que je peux rejoindre contre.

d.printSchema
// root
//  |-- i: integer (nullable = false)
//  |-- u: string (nullable = true)
//  |-- s: binary (nullable = true)

32voto

zero323 Points 5699
  1. L'utilisation de générique encodeurs.

    Il y a deux génériques encodeurs disponible pour le moment kryo et javaSerialization lorsque celle-ci est explicitement décrit comme:

    extrêmement inefficace et ne doit être utilisée qu'en dernier recours.

    En supposant classe suivante

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    vous pouvez utiliser ces codeurs en ajoutant implicite de l'encodeur:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    qui peut être utilisé comme suit:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    Il stocke les objets qu' binary colonne donc, lorsqu'ils sont convertis en DataFrame vous obtenez schéma suivant:

    root
     |-- value: binary (nullable = true)
    

    Il est aussi possible d'encoder les tuples à l'aide d' kryo codeur pour le domaine spécifique:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Veuillez noter que nous ne dépendent pas implicite encodeurs ici, mais passer de l'encodeur explicitement cette les plus susceptibles de ne pas travailler avec toDS méthode.

  2. À l'aide de conversions implicites:

    Fournir des conversions implicites entre la représentation qui peut être codé et la classe personnalisée, par exemple:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

Questions connexes:

13voto

Alexi Points 57

Vous pouvez utiliser UDTRegistration et puis des Classes de Cas, les n-Uplets, etc... tout fonctionne correctement avec votre Type Défini par l'Utilisateur!

Dites que vous voulez utiliser un Enum:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

Inscrire de cette façon:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

Ensuite l'UTILISER!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

Dites que vous voulez utiliser un Polymorphe Enregistrement:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

... et l'utiliser comme ceci:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Vous pouvez écrire un personnalisé défini par l'utilisateur qui encode tout d'octets (je suis en utilisant la sérialisation java ici, mais c'est probablement mieux d'instrument Étincelle de Kryo contexte).

Tout d'abord, définir la classe de l'UDT:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Puis de l'enregistrer:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Ensuite, vous pouvez l'utiliser!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

4voto

Les encodeurs de travail plus ou moins la même chose en Spark2.0. Et Kryo est toujours recommandé serialization choix.

Vous pouvez regarder l'exemple suivant avec spark-shell

scala> import spark.implicits._
import spark.implicits._

scala> import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Encoders

scala> case class NormalPerson(name: String, age: Int) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class NormalPerson

scala> case class ReversePerson(name: Int, age: String) {
 |   def aboutMe = s"I am ${name}. I am ${age} years old."
 | }
defined class ReversePerson

scala> val normalPersons = Seq(
 |   NormalPerson("Superman", 25),
 |   NormalPerson("Spiderman", 17),
 |   NormalPerson("Ironman", 29)
 | )
normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29))

scala> val ds1 = sc.parallelize(normalPersons).toDS
ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int]

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds1.show()
+---------+---+
|     name|age|
+---------+---+
| Superman| 25|
|Spiderman| 17|
|  Ironman| 29|
+---------+---+

scala> ds2.show()
+----+---------+
|name|      age|
+----+---------+
|  25| Superman|
|  17|Spiderman|
|  29|  Ironman|
+----+---------+

scala> ds1.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Superman. I am 25 years old.
I am Spiderman. I am 17 years old.

scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name))
ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string]

scala> ds2.foreach(p => println(p.aboutMe))
I am 17. I am Spiderman years old.
I am 25. I am Superman years old.
I am 29. I am Ironman years old.

Jusqu'à maintenant] il n'y avait pas appropriate encoders dans le champ d'application actuel de sorte que nos personnes n'ont pas été codées comme binary valeurs. Mais qui va changer une fois que nous fournissons quelques implicit codeurs à l'aide de Kryo de la sérialisation.

// Provide Encoders

scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson]
normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary]

scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson]
reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary]

// Ecoders will be used since they are now present in Scope

scala> val ds3 = sc.parallelize(normalPersons).toDS
ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary]

scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name))
ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary]

// now all our persons show up as binary values
scala> ds3.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

scala> ds4.show()
+--------------------+
|               value|
+--------------------+
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
|[01 00 24 6C 69 6...|
+--------------------+

// Our instances still work as expected    

scala> ds3.foreach(p => println(p.aboutMe))
I am Ironman. I am 29 years old.
I am Spiderman. I am 17 years old.
I am Superman. I am 25 years old.

scala> ds4.foreach(p => println(p.aboutMe))
I am 25. I am Superman years old.
I am 29. I am Ironman years old.
I am 17. I am Spiderman years old.

4voto

Akash Mahajan Points 199

Dans le cas de la classe Java Bean, cela peut être utile

 import spark.sqlContext.implicits._
import org.apache.spark.sql.Encoders
implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
 

Maintenant, vous pouvez simplement lire le dataFrame en tant que DataFrame personnalisé

 dataFrame.as[MyClass]
 

Cela créera un encodeur de classe personnalisé et non un encodeur binaire.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X