8 votes

Consommez la dernière valeur d'un sujet pour chaque clé.

J'ai un producteur Kafka qui produit des messages à un rythme élevé (la clé du message est, disons, un nom d'utilisateur et la valeur est son score actuel dans un jeu). Le consommateur Kafka est relativement lent à traiter les messages consommés. Ici, mon exigence est de montrer le score le plus récent et d'éviter de montrer des données périmées, avec le compromis que certains scores peuvent ne jamais être montrés.

Essentiellement, pour chacun des noms d'utilisateur, je peux avoir des centaines de messages dans la même partition, mais je veux toujours lire le dernier.

Une solution rudimentaire qui a été mise en œuvre était la suivante : Le producteur envoie juste une clé comme chaque message et la valeur réelle est écrite dans une base de données, qui est partagée avec le consommateur. Le consommateur lit chaque clé de la file d'attente et chaque valeur de la base de données. Ici, l'objectif de lire toujours la dernière valeur est atteint par le producteur qui écrase la valeur dans la base de données -- ainsi le consommateur qui est en fait en train de lire une clé donnée consommera en fait la dernière valeur. Mais cette solution présente quelques inconvénients en raison du nombre élevé de lectures et de mises à jour (lenteur, conditions de course, etc.).

Je cherche une façon plus naturelle de résoudre ce problème dans kafka ou kafka streams où je peux en quelque sorte définir obtenir la dernière valeur pour une clé à partir du flux de données pour chaque clé . Merci !

0voto

senseiwu Points 1716

Le code ci-dessous m'a aidé

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> dataTable = builder.table("input-topic");
dataTable.toStream().foreach((key, message) -> client.post(message));
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

Ce qui rend cela possible dans la pratique est le compactage en mémoire du flux entrant (détails expliqués aquí ). On peut contrôler la pression en utilisant les paramètres cache.max.bytes.buffering y commit.interval.ms

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X