J'ai un petit problème pour joindre deux ensembles de données dans spark, j'ai ceci :
SparkConf conf = new SparkConf()
.setAppName("MyFunnyApp")
.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.debug.maxToStringFields", 150)
.getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile1)
.as(encoderObject1);
Dataset<MyOwnObject2> object2DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile2)
.as(encoderObject2);
Je peux imprimer le schéma et l'afficher correctement.
//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS =
object1DS.join(object2DS, object1DS.col("column01")
.equalTo(object2DS.col("column01")))
.as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
La dernière ligne n'arrive pas à se joindre et me donne cette erreur :
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;
C'est vrai, parce que Tuple2 (object2) n'a pas tous les vars...
Puis j'avais essayé ça :
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
.joinWith(object2DS, object1DS
.col("column01")
.equalTo(object2DS.col("column01")));
Et ça marche bien ! Mais, j'ai besoin d'un nouveau Dataset sans tuple, j'ai un object3, qui a quelques vars de l'object1 et object2, alors j'ai ce problème :
Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
//...
//Sets data from object 1 and 2 to 3.
//...
return myOwnObject3;
}, encoderObject3);
Échec ... voici l'erreur :
17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
et sur des milliers de lignes d'erreurs...
Qu'est-ce que je peux faire ? J'avais essayé :
- Faire mon objet seulement avec String, int (ou Integer) et double (ou Double) (pas plus)
- utiliser différents encodeurs comme kryo ou javaSerialization
- utiliser JavaRDD (fonctionne ! mais très lentement) et utiliser des Dataframes avec des Rows (fonctionne, mais je dois changer beaucoup d'objets)
- Tous mes objets java sont sérialisables
- utiliser sparks 2.1.0 et 2.1.1, maintenant j'ai 2.1.1 sur mon pom.xml
Je veux utiliser les Datasets, utiliser la vitesse des Dataframes et la sintaxe des objets de JavaRDD...
De l'aide ?
Merci