5 votes

Comment joindre deux jeux de données spark en un seul avec des objets java ?

J'ai un petit problème pour joindre deux ensembles de données dans spark, j'ai ceci :

SparkConf conf = new SparkConf()
    .setAppName("MyFunnyApp")
    .setMaster("local[*]");

SparkSession spark = SparkSession
    .builder()
    .config(conf)
    .config("spark.debug.maxToStringFields", 150)
    .getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile1)
    .as(encoderObject1);

Dataset<MyOwnObject2> object2DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile2)
    .as(encoderObject2);

Je peux imprimer le schéma et l'afficher correctement.

//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01")
    .equalTo(object2DS.col("column01")))
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));

La dernière ligne n'arrive pas à se joindre et me donne cette erreur :

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;

C'est vrai, parce que Tuple2 (object2) n'a pas tous les vars...

Puis j'avais essayé ça :

 Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
    .joinWith(object2DS, object1DS
        .col("column01")
        .equalTo(object2DS.col("column01")));

Et ça marche bien ! Mais, j'ai besoin d'un nouveau Dataset sans tuple, j'ai un object3, qui a quelques vars de l'object1 et object2, alors j'ai ce problème :

Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
    MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
    MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
    MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
    //...
    //Sets data from object 1 and 2 to 3.
    //...
    return myOwnObject3;
}, encoderObject3);

Échec ... voici l'erreur :

17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import

et sur des milliers de lignes d'erreurs...

Qu'est-ce que je peux faire ? J'avais essayé :

  • Faire mon objet seulement avec String, int (ou Integer) et double (ou Double) (pas plus)
  • utiliser différents encodeurs comme kryo ou javaSerialization
  • utiliser JavaRDD (fonctionne ! mais très lentement) et utiliser des Dataframes avec des Rows (fonctionne, mais je dois changer beaucoup d'objets)
  • Tous mes objets java sont sérialisables
  • utiliser sparks 2.1.0 et 2.1.1, maintenant j'ai 2.1.1 sur mon pom.xml

Je veux utiliser les Datasets, utiliser la vitesse des Dataframes et la sintaxe des objets de JavaRDD...

De l'aide ?

Merci

-1voto

viti Points 24

J'ai finalement trouvé une solution,

J'ai eu un problème avec l'option inferSchema lorsque mon code créait un Dataset. J'ai une colonne String et l'option inferSchema me renvoie une colonne Integer parce que toutes les valeurs sont "numeric", mais j'ai besoin de les utiliser comme String (comme "0001", "0002"...) J'ai besoin de faire un schéma, mais j'ai beaucoup de variables, alors j'écris ceci avec toutes mes classes :

List<StructField> fieldsObject1 = new ArrayList<>();
for (Field field : MyOwnObject1.class.getDeclaredFields()) {
    fieldsObject1.add(DataTypes.createStructField(
        field.getName(),
        CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
        true)
    );
}
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .schema(schemaObject1)
    .csv(pathToFile1)
    .as(encoderObject1);

Cela fonctionne bien.

La "meilleure" solution serait la suivante :

  Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .schema(encoderObject1.schema())
    .csv(pathToFile1)
    .as(encoderObject1);

mais encoderObject1.schema() me renvoie un Schema avec des vars dans l'ordre alphabétique, pas dans l'ordre original, alors cette option échoue quand je lis un csv. Peut-être que les encodeurs devraient retourner un schéma avec les variables dans l'ordre original et non dans l'ordre alphabétique.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X