3 votes

Comment faire pour que les données de Spark soient écrites dans un topic kafka au format avro ?

J'ai un Dataframe dans Spark qui ressemble à

événementDF

   Sno|UserID|TypeExp
    1|JAS123|MOVIE
    2|ASP123|GAMES
    3|JAS123|CLOTHING
    4|DPS123|MOVIE
    5|DPS123|CLOTHING
    6|ASP123|MEDICAL
    7|JAS123|OTH
    8|POQ133|MEDICAL
    .......
    10000|DPS123|OTH

J'ai besoin de l'écrire dans un topic Kafka au format Avro Actuellement, je suis capable d'écrire dans Kafka en tant que JSON en utilisant le code suivant

val kafkaUserDF: DataFrame = eventDF.select(to_json(struct(eventDF.columns.map(column):_*)).alias("value"))
  kafkaUserDF.selectExpr("CAST(value AS STRING)").write.format("kafka")
    .option("kafka.bootstrap.servers", "Host:port")
    .option("topic", "eventdf")
    .save()

Maintenant, je veux écrire ceci au format Avro dans un sujet Kafka

3voto

hi-zir Points 19277

Étincelle >= 2.4 :

Vous pouvez utiliser to_avro de la fonction spark-avro bibliothèque.

import org.apache.spark.sql.avro._

eventDF.select(
  to_avro(struct(eventDF.columns.map(column):_*)).alias("value")
)

Etincelle < 2.4

Vous devez procéder de la même manière :

  • Créer une fonction qui écrit un enregistrement Avro sérialisé dans ByteArrayOutputStream et renvoie le résultat. Une implémentation naïve (qui ne prend en charge que les objets plats) pourrait être similaire à (adoptée à partir de Exemple Kafka Avro Scala par Sushil Kumar Singh )

    import org.apache.spark.sql.Row
    
    def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
      val gr: GenericRecord = new GenericData.Record(schema)
      row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))
    
      val writer = new SpecificDatumWriter[GenericRecord](schema)
      val out = new ByteArrayOutputStream()
      val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
      writer.write(gr, encoder)
      encoder.flush()
      out.close()
    
      out.toByteArray()
    }
  • Le convertir en udf :

    import org.apache.spark.sql.functions.udf
    
    val schema: org.apache.avro.Schema
    val encodeUDF = udf(encode(schema) _)
  • A utiliser en remplacement de to_json

    eventDF.select(
      encodeUDF(struct(eventDF.columns.map(column):_*)).alias("value")
    )

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X