Bien que la réponse acceptée réponde parfaitement à la question du PO, il existe d'autres paramètres disponibles pour réinitialiser les décalages. J'ajoute donc cette réponse pour étendre la réponse acceptée.
Pour réinitialiser le décalage de tous les sujets au plus tôt dans le groupe de consommateurs
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-earliest --all-topics --execute
Pour réinitialiser le décalage d'un sujet spécifique au plus ancien dans le groupe de consommateurs.
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-earliest --topic <my-topic> --execute
Pour réinitialiser le décalage d'un sujet spécifique à un décalage spécifique dans le groupe de consommateurs.
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute
Autres arguments soutenus :
--décalage par [nombre entier positif ou négatif] - Décale le décalage vers l'avant ou vers l'arrière par rapport au nombre entier donné.
-aujourd'hui y -à la dernière minute sont les mêmes que --pour compenser y -au plus tôt .
--to-datetime [Le format de l'heure est yyyy-MM-ddTHH:mm:ss.xxx ]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute
-par-durée [Le format est PnDTnHnMnS ]
kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
<group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute
Réinitialisation du décalage de la durée par rapport à l'horodatage actuel.
Comment valider ?
Utilisez la commande ci-dessous pour vérifier les décalages actuels/finaux et pour confirmer que la réinitialisation a effectué les décalages.
kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe
Exemple de sortie :
Consumer group 'group1' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
intro 0 0 99 99 - - -
Erreurs possibles :
Erreur : Les affectations ne peuvent être réinitialisées que si le groupe '[nom_du_groupe]' est inactif, mais que l'état actuel est stable.
Stable" signifie qu'il existe un consommateur actif pour ce groupe. Vous devez donc d'abord arrêter le ou les consommateurs actifs et réessayer de réinitialiser les décalages.
Il est pas possible de réinitialiser les offsets s'il y a un consommateur actif pour le groupe de consommateurs.