Mise à jour
Cette réponse est toujours valable et instructif, même si les choses sont maintenant mieux depuis la 2.2/2.3, qui ajoute intégré dans le support codeur pour Set
, Seq
, Map
, Date
, Timestamp
, et BigDecimal
. Si vous vous en tenez à faire des types avec uniquement des classes de cas et de la Scala types, vous devez être bien avec juste l'implicite en SQLImplicits
.
Malheureusement, pratiquement rien n'a été ajouté pour aider à cela. Recherche d' @since 2.0.0
en Encoders.scala
ou SQLImplicits.scala
trouve les choses surtout à voir avec les types primitifs (et quelques réglages de classes de cas). Donc, première chose à dire: il n'y a actuellement pas de véritable bonne prise en charge personnalisée de la classe des encodeurs. Avec cela de la route, ce qui suit est quelques trucs qui ne aussi bon travail que nous pouvons espérer, compte tenu de ce que nous avons actuellement à notre disposition. Comme un premier avertissement: cela ne fonctionne pas parfaitement, et je vais faire de mon mieux pour rendre toutes les limitations clairs et directs.
Quel est le problème exactement
Lorsque vous voulez faire un jeu de données, la Spark "exige un codeur (pour convertir une JVM objet de type T et de l'intérieur Spark SQL représentation) qui est généralement créé automatiquement par le biais implicites à partir d'un SparkSession
, ou peut être créé explicitement en appelant des méthodes statiques sur Encoders
" (extrait de la docs sur createDataset
). Un codeur prendra la forme Encoder[T]
où T
est le type d'encodage. La première suggestion est d'ajouter import spark.implicits._
(ce qui vous donne ces implicite encodeurs) et la deuxième suggestion est de transmettre explicitement dans l'implicite du codeur à l'aide de cet ensemble de codeur fonctions connexes.
Il n'y a pas de codeur disponible pour des cours réguliers, de sorte
import spark.implicits._
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
vous donnera la implicites suivantes liées erreur de compilation:
Impossible de trouver le codeur de type stockées dans une base de données. Les types primitifs (Int, String, etc) et types de Produit (cas des classes) sont pris en charge par l'importation de sqlContext.implicites._ Soutien pour la sérialisation d'autres types seront ajoutés dans les prochaines versions
Toutefois, si vous enroulez quel que soit le type que vous venez d'utiliser pour obtenir l'erreur ci-dessus dans certaines classe qui étend la classe Product
, l'erreur de prêter à confusion, de retard de l'exécution, de sorte
import spark.implicits._
case class Wrap[T](unwrap: T)
class MyObj(val i: Int)
// ...
val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
Compile très bien, mais échoue lors de l'exécution avec
java.lang.UnsupportedOperationException: n'a Pas d'Encodeur trouvé pour MyObj
La raison pour cela est que les codeurs Étincelle crée avec les implicites sont en réalité qu'un fait au moment de l'exécution (via scala relfection). Dans ce cas, tous Étincelle vérifie au moment de la compilation, c'est que l'extérieur de la classe s'étend Product
(toutes les classes), et seulement réalise au moment de l'exécution qu'il ne sait toujours pas quoi faire avec MyObj
(le même problème se produit si j'ai essayé de faire un Dataset[(Int,MyObj)]
- Spark attend jusqu'à ce que l'exécution pour barf sur MyObj
). Ce sont des problèmes centraux qui sont dans le besoin d'être corrigé:
- certaines classes qui étendent
Product
de la compilation en dépit de toujours s'écraser au moment de l'exécution et
- il n'y a aucun moyen de passer en custom encodeurs pour les types imbriqués (je n'ai aucun moyen de l'alimentation de l'Étincelle d'un codeur pour seulement
MyObj
telle qu'elle sait alors comment coder Wrap[MyObj]
ou (Int,MyObj)
).
Suffit d'utiliser kryo
La solution tout le monde suggère est d'utiliser l' kryo
de l'encodeur.
import spark.implicits._
class MyObj(val i: Int)
implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
// ...
val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
Cela devient assez fastidieux rapide. Surtout si votre code est de manipuler toutes sortes de jeux de données, l'assemblage, le groupement des etc. Vous vous retrouvez en accumulant un tas de extra implicites. Alors, pourquoi ne pas simplement faire un implicite que tout cela est-il automatiquement?
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) =
org.apache.spark.sql.Encoders.kryo[A](ct)
Et maintenant, il semble que je peux faire presque tout ce que je veux (l'exemple ci-dessous ne fonctionne pas dans l' spark-shell
où spark.implicits._
est automatiquement importé)
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
Ou presque. Le problème est que l'utilisation d' kryo
conduit à Étincelle juste le stockage de chaque ligne dans le jeu de données comme un plat d'objet binaire. Pour map
, filter
, foreach
c'est suffisant, mais pour des opérations comme join
, la Spark a vraiment besoin de ces pour être répartis en colonnes. L'inspection du schéma d' d2
ou d3
, que vous voyez là est juste une colonne binaire:
d2.printSchema
// root
// |-- value: binary (nullable = true)
Solution partielle pour les n-uplets
Donc, en utilisant la magie des implicites dans Scala (de plus en 6.26.3 la Surcharge de Résolution), je peux me faire une série d'implicites qui va le faire aussi bien que possible, au moins pour les tuples, et va bien travailler avec l'existant implicites:
import org.apache.spark.sql.{Encoder,Encoders}
import scala.reflect.ClassTag
import spark.implicits._ // we can still take advantage of all the old implicits
implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
implicit def tuple2[A1, A2](
implicit e1: Encoder[A1],
e2: Encoder[A2]
): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
implicit def tuple3[A1, A2, A3](
implicit e1: Encoder[A1],
e2: Encoder[A2],
e3: Encoder[A3]
): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
// ... you can keep making these
Puis, armés de ces implicites, je peux faire mon exemple ci-dessus, bien qu'avec une certaine colonne de renommage
class MyObj(val i: Int)
val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
val d3 = d1.map(d => (d.i ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
Je n'ai pas encore compris comment faire pour obtenir l'attend tuple noms (_1
, _2
, ...) par défaut sans les renommer - si quelqu'un veut jouer avec cela, c' est là que le nom de l' "value"
se présente et c' est là que le tuple noms sont généralement ajoutés. Toutefois, le point essentiel est que j'ai maintenant une belle structuré schéma:
d4.printSchema
// root
// |-- _1: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
// |-- _2: struct (nullable = false)
// | |-- _1: integer (nullable = true)
// | |-- _2: binary (nullable = true)
Donc, en résumé, cette solution de contournement:
- nous permet d'obtenir des colonnes distinctes pour les n-uplets (si nous pouvons nous rejoindre sur les tuples de nouveau, yay!)
- on peut de nouveau compter sur le implicites (donc pas besoin de passer en
kryo
partout)
- est presque entièrement rétro-compatible avec
import spark.implicits._
(avec quelques renommer impliqués)
- ne pas les laisser nous rejoindre sur l'
kyro
sérialisé binaire colonnes, a fortiori sur les champs de ceux qui peuvent avoir
- a la désagréable effet de bord de la renommant certains de le tuple les colonnes "valeur" (si nécessaire, cela peut être annulée par la conversion de
.toDF
, de la spécification de nouveaux noms de colonnes, et de le convertir en un ensemble de données et les noms de schéma semblent être conservée à travers les jointures, où ils sont le plus nécessaires).
Solution partielle pour les classes en général
Celui-ci est moins agréable et n'a pas de bonne solution. Cependant, maintenant que nous avons le tuple solution ci-dessus, j'ai un pressentiment de la conversion implicite de la solution d'une autre réponse sera un peu moins douloureux aussi, puisque vous pouvez convertir votre plus complexe classes de n-uplets. Puis, après la création du jeu de données, vous auriez probablement renommer les colonnes à l'aide de la dataframe approche. Si tout va bien, c'est vraiment une amélioration, car je peux maintenant effectuer des jointures sur les champs de mes classes. Si j'avais juste utilisé un plat binaires kryo
sérialiseur qui n'aurait pas été possible.
Voici un exemple qui fait un peu de tout: j'ai une classe MyObj
qui a des champs de types Int
, java.util.UUID
, et Set[String]
. Le premier prend soin de lui-même. La seconde, bien que je ne pouvais sérialiser l'aide d' kryo
serait plus utile s'il est stocké en tant que String
(depuis UUID
s sont généralement quelque chose que je souhaite rejoindre contre). La troisième vraiment seulement le fait dans une colonne binaire.
class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
// alias for the type to convert to and from
type MyObjEncoded = (Int, String, Set[String])
// implicit conversions
implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
implicit def fromEncoded(e: MyObjEncoded): MyObj =
new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
Maintenant, je peux créer un jeu de données avec un joli schéma en utilisant cette machine:
val d = spark.createDataset(Seq[MyObjEncoded](
new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
)).toDF("i","u","s").as[MyObjEncoded]
Et le schéma montre moi j'ai des colonnes avec les noms corrects et avec la première, les deux choses que je peux rejoindre contre.
d.printSchema
// root
// |-- i: integer (nullable = false)
// |-- u: string (nullable = true)
// |-- s: binary (nullable = true)