3 votes

Sous quelles circonstances endOffset > lastMsg.offset + 1?

Kafka retourne endOffset 15 pour une partition, mais le dernier message qui peut être consommé a l'offset 13, plutôt que 14, ce que je m'attendais. Je me demande pourquoi.

Les documents Kafka indiquent

Dans le niveau d'isolation read_uncommitted par défaut, l'offset de fin est le haut watermark (c'est-à-dire l'offset du dernier message répliqué avec succès plus un). Pour les consommateurs read_committed, l'offset de fin est le dernier offset stable (LSO), qui est le minimum du haut watermark et le plus petit offset de toute transaction ouverte.

Voici la sortie de kafkacat. J'utilise kafkacat, car il peut imprimer les offsets des messages:

$ kafkacat -Ce -p0 -tTK -f'offset: %o key: %k\n'
offset: 0 key: 0108
offset: 1 key: 0253
offset: 4 key: 0278
offset: 5 key: 0198
offset: 8 key: 0278
offset: 9 key: 0210
offset: 10 key: 0253
offset: 11 key: 1058
offset: 12 key: 0141
offset: 13 key: 1141
% Atteint la fin du sujet TK [0] à l'offset 15 : sortie

Ce qui est aussi déconcertant - et cela pourrait très bien être lié - c'est que les offsets ne sont pas consécutifs, bien que je n'ai pas configuré la compaction, etc.

Quelques détails supplémentaires:

$ kafka-topics.sh --bootstrap-server localhost:9092 --topic TK --describe
Sujet: TK       Nombre de partitions: 2       Facteur de réplication: 1    Configurations: segment.bytes=1073741824
        Sujet: TK       Partition: 0    Leader: 0       Répliques: 0     Isr: 0
        Sujet: TK       Partition: 1    Leader: 0       Répliques: 0     Isr: 0

Impression des clés via kafka-console-consumer.sh:

$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TK \
  --offset earliest --partition 0 --timeout-ms 5000 \
  --property print.key=true --property print.value=false
0108
0253
0278
0198
0278
0210
0253
1058
0141
1141
[2021-09-15 10:54:06,556] ERREUR Erreur de traitement du message, processus de consommation terminé :  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Traité un total de 10 messages

N.B. Ce sujet a été produit sans implication de transactions, et *) la consommation est faite en mode read_uncommitted.

*) En fait, processing.guarantee est défini à exactly_once_beta, ce qui reviendrait à utiliser des transactions.


Plus d'informations Il s'avère que je peux reproduire ce cas de manière fiable avec mon application Streams (1. effacer les données de kafka/zookeeper, 2. recréer les sujets, 3. exécuter l'application), dont la sortie est le sujet qui montre ce problème. Entre-temps, j'ai simplifié l'application Streams à cette topologie sans effet et je peux toujours le reproduire :

Topologies:
   Sous-topologie: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [TK1])
      --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001 (topic: TK)
      <-- KSTREAM-SOURCE-0000000000

Nouvelles Entre-temps, j'ai remplacé le courtier Kafka en cours d'exécution localement (2.5.0) par un fonctionnant dans un conteneur Docker (wurstmeister/kafka:2.13-2.6.0). Le problème persiste.

L'application utilise les bibliothèques Kafka versionnées 6.0.1-ccs, correspondant à 2.6.0.

0voto

Eugene Beresovksy Points 3852

Lorsque je supprime le paramètre processing.guarantee: exactly_once_beta, le problème disparaît. En ce qui concerne ce problème, peu importe que j'utilise exactly_once_beta ou exactly_once.

Je me demande encore pourquoi cela se produit avec exactly_once(_beta) - après tout, dans mes tests, tout se passe bien et il n'y a pas de rollback de transactions, etc.

Dans mes derniers tests, cette règle semble s'appliquer à toutes les partitions contenant au moins un élément:

endOffset == lastMsg.offset + 3

Ce qui est 2 de plus que prévu.

La documentation Kafka mentionnée dans la question indique que

Pour les consommateurs en mode read_committed, l'offset de fin est le dernier offset stable (LSO), qui est le minimum entre la marque d'eau haute et le plus petit offset de toute transaction ouverte.

Est-ce que Kafka utilise peut-être des offsets pour (2 ?) transactions en cours par partition?

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X