J'essaie de faire du streaming structuré à partir de Kafka. J'envisage de stocker les points de contrôle dans HDFS. J'ai lu un blog de Cloudera recommandant de ne pas stocker les points de contrôle dans HDFS pour le streaming Spark. Est-ce le même problème pour les points de contrôle du streaming structuré ? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ .
Dans le streaming structuré, si mon programme spark est en panne pendant un certain temps, comment puis-je obtenir le dernier décalage du répertoire de contrôle et charger les données après ce décalage. Je stocke les points de contrôle dans un répertoire comme indiqué ci-dessous.
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
Mise à jour :
Voici mon programme de streaming structuré qui lit un message Kafka, le décompresse et l'écrit sur HDFS.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()