4 votes

Point de contrôle Kafka Structured Streaming

J'essaie de faire du streaming structuré à partir de Kafka. J'envisage de stocker les points de contrôle dans HDFS. J'ai lu un blog de Cloudera recommandant de ne pas stocker les points de contrôle dans HDFS pour le streaming Spark. Est-ce le même problème pour les points de contrôle du streaming structuré ? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ .

Dans le streaming structuré, si mon programme spark est en panne pendant un certain temps, comment puis-je obtenir le dernier décalage du répertoire de contrôle et charger les données après ce décalage. Je stocke les points de contrôle dans un répertoire comme indiqué ci-dessous.

 df.writeStream\
        .format("text")\
        .option("path", '\files') \
        .option("checkpointLocation", 'checkpoints\chkpt') \
        .start()

Mise à jour :

Voici mon programme de streaming structuré qui lit un message Kafka, le décompresse et l'écrit sur HDFS.

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KafkaServer) \
        .option("subscribe", KafkaTopics) \
        .option("failOnDataLoss", "false")\
         .load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
    .format("text")\
    .option("path", \Data_directory_inHDFS) \
    .option("checkpointLocation", \pathinDHFS\) \
    .start()

query.awaitTermination()

5voto

Naman Agarwal Points 192

Il est préférable de stocker les points de contrôle sur des supports à long terme (HDFS, AWS S3, etc.). Je voudrais ajouter un point ici que la propriété "failOnDataLoss" ne devrait pas être définie à false car ce n'est pas la meilleure pratique. La perte de données est quelque chose que personne ne veut se permettre. Vous êtes sur la bonne voie.

4voto

dexter007 Points 203

En streaming structuré, si mon programme spark est en panne pendant un certain temps, comment puis-je obtenir le dernier décalage du répertoire de point de contrôle et charger les données après ce décalage.

Sous votre dossier checkpointdir, vous trouverez un dossier nommé 'offsets'. Le dossier 'offsets' maintient les prochains offsets à demander à kafka. Ouvrez le dernier fichier (dernier fichier batch) sous le dossier 'offsets', les prochains offsets attendus seront dans le format ci-dessous

{"kafkatopicname":{"2":16810618,"1":16810853,"0":91332989}}

Pour charger les données après ce décalage, définissez la propriété suivante à votre flux de lecture spark read stream

 .option("startingOffsets", "{\""+topic+"\":{\"0\":91332989,\"1\":16810853,\"2\":16810618}}")

0,1,2 sont les partitions dans le sujet.

0voto

Abhay Dandekar Points 11

Dans votre requête, essayez d'appliquer un point de contrôle tout en écrivant les résultats dans un stockage persistant comme HDFS dans un format comme parquet. Cela a bien fonctionné pour moi.

0voto

Avi Chalbani Points 331

Si j'ai bien compris l'artificiel, il recommande de maintenir la gestion de l'offset soit en dedans : Hbase, Kafka, HDFS ou Zookeeper.

"Il convient de mentionner que vous pouvez également stocker les décalages dans un système de stockage système de stockage comme HDFS. Le stockage des décalages dans HDFS est une approche moins populaire moins populaire que les options ci-dessus, car HDFS a une latence plus élevée que d'autres d'autres systèmes comme ZooKeeper et HBase."

vous trouverez dans la documentation Spark comment relancer une requête à partir d'un point de contrôle existant : http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X