Vous devrez gérer manuellement les clés uniques : mais avec cette approche, il est possible lors de l'utilisation de
KafkaUtils.createDirectStream
À partir de la documentation de Spark http://spark.apache.org/docs/latest/streaming-kafka-integration.html :
Approche 2 : Approche directe (sans récepteurs)
chaque enregistrement est reçu par Spark Streaming effectivement exactement une fois malgré les échecs.
Et voici l'exigence d'idempotence
- par exemple, en sauvegardant une clé unique par message dans Postgres
:
Afin d'atteindre des sémantiques exactement une fois pour la sortie de vos résultats, votre opération de sortie qui enregistre les données dans un entrepôt de données externe doit être soit idempotente, soit une transaction atomique qui enregistre les résultats et les décalages (voir Sémantique des opérations de sortie dans le guide de programmation principal pour plus d'informations).
Voici une idée du genre de code dont vous auriez besoin pour gérer les clés uniques (de http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/) :
stream.foreachRDD { rdd =>
rdd.foreachPartition { iter =>
// assurez-vous que le pool de connexions est configuré sur l'exécuteur avant d'écrire
SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.foreach { case (key, msg) =>
DB.autoCommit { implicit session =>
// la clé unique pour l'idempotence est simplement le texte du message lui-même, à titre d'exemple
sql"insert into idem_data(msg) values (${msg})".update.apply
}
}
}
}
Un identifiant unique par message devrait être géré.