Kafka retourne endOffset 15 pour une partition, mais le dernier message qui peut être consommé a l'offset 13, plutôt que 14, ce que je m'attendais. Je me demande pourquoi.
Les documents Kafka indiquent
Dans le niveau d'isolation read_uncommitted par défaut, l'offset de fin est le haut watermark (c'est-à-dire l'offset du dernier message répliqué avec succès plus un). Pour les consommateurs read_committed, l'offset de fin est le dernier offset stable (LSO), qui est le minimum du haut watermark et le plus petit offset de toute transaction ouverte.
Voici la sortie de kafkacat. J'utilise kafkacat, car il peut imprimer les offsets des messages:
$ kafkacat -Ce -p0 -tTK -f'offset: %o key: %k\n'
offset: 0 key: 0108
offset: 1 key: 0253
offset: 4 key: 0278
offset: 5 key: 0198
offset: 8 key: 0278
offset: 9 key: 0210
offset: 10 key: 0253
offset: 11 key: 1058
offset: 12 key: 0141
offset: 13 key: 1141
% Atteint la fin du sujet TK [0] à l'offset 15 : sortie
Ce qui est aussi déconcertant - et cela pourrait très bien être lié - c'est que les offsets ne sont pas consécutifs, bien que je n'ai pas configuré la compaction, etc.
Quelques détails supplémentaires:
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic TK --describe
Sujet: TK Nombre de partitions: 2 Facteur de réplication: 1 Configurations: segment.bytes=1073741824
Sujet: TK Partition: 0 Leader: 0 Répliques: 0 Isr: 0
Sujet: TK Partition: 1 Leader: 0 Répliques: 0 Isr: 0
Impression des clés via kafka-console-consumer.sh:
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TK \
--offset earliest --partition 0 --timeout-ms 5000 \
--property print.key=true --property print.value=false
0108
0253
0278
0198
0278
0210
0253
1058
0141
1141
[2021-09-15 10:54:06,556] ERREUR Erreur de traitement du message, processus de consommation terminé : (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Traité un total de 10 messages
N.B. Ce sujet a été produit sans implication de transactions, et *) la consommation est faite en mode read_uncommitted
.
*) En fait, processing.guarantee est défini à exactly_once_beta
, ce qui reviendrait à utiliser des transactions.
Plus d'informations Il s'avère que je peux reproduire ce cas de manière fiable avec mon application Streams (1. effacer les données de kafka/zookeeper, 2. recréer les sujets, 3. exécuter l'application), dont la sortie est le sujet qui montre ce problème. Entre-temps, j'ai simplifié l'application Streams à cette topologie sans effet et je peux toujours le reproduire :
Topologies:
Sous-topologie: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [TK1])
--> KSTREAM-SINK-0000000001
Sink: KSTREAM-SINK-0000000001 (topic: TK)
<-- KSTREAM-SOURCE-0000000000
Nouvelles Entre-temps, j'ai remplacé le courtier Kafka en cours d'exécution localement (2.5.0) par un fonctionnant dans un conteneur Docker (wurstmeister/kafka:2.13-2.6.0). Le problème persiste.
L'application utilise les bibliothèques Kafka versionnées 6.0.1-ccs, correspondant à 2.6.0.