48 votes

Kafka 0.11 : comment réinitialiser les décalages ?

J'essaie de réinitialiser le décalage des consommateurs avec les derniers outils CLI pour Kafka.

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics

En conséquence, je vois cette sortie :

TOPIC                            PARTITION  NEW-OFFSET
FirstTopic                       0          0
SecondTopic                      0          0

Mais je relance la commande :

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group my-group --describe

résultats en sortie :

Consumer group 'my-group' has no active members.

TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
FirstTopic         0          1230            1230            0  
SecondTopic        0          1022            1022            0

J'ai essayé d'autres options comme la réinitialisation à un décalage explicite ou la spécification directe du sujet, mais le résultat est le même. La sortie suggère que l'opération a réussi alors que la vérification des décalages avec la commande describe ou le débogage montre que le décalage n'a pas été modifié.

Quelqu'un a réussi à réinitialiser la compensation des consommateurs dans les courtiers non-zookeeper.

80voto

Mickael Maison Points 6089

Par défaut, --reset-offsets imprime simplement le résultat de l'opération. Pour effectuer réellement l'opération, vous devez ajouter --execute à votre commandement :

kafka-consumer-groups.bat --bootstrap-server kafka-host:9092 --group
my-group --reset-offsets --to-earliest --all-topics --execute

3 votes

Existe-t-il d'autres options pour réinitialiser un décalage spécifique, par exemple 1200 pour FirstTopic dans la question ci-dessus ? Pour être plus précis, je voudrais que mon consommateur se réinitialise à un moment précis dans le temps (quel que soit le décalage à ce moment-là).

2 votes

J'aimerais également savoir comment les réaliser via l'API Java de Kafka.

0 votes

Il convient également de mentionner ce point : gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

26voto

Arun211 Points 626

Bien que la réponse acceptée réponde parfaitement à la question du PO, il existe d'autres paramètres disponibles pour réinitialiser les décalages. J'ajoute donc cette réponse pour étendre la réponse acceptée.

Pour réinitialiser le décalage de tous les sujets au plus tôt dans le groupe de consommateurs

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-earliest --all-topics --execute

Pour réinitialiser le décalage d'un sujet spécifique au plus ancien dans le groupe de consommateurs.

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-earliest --topic <my-topic> --execute

Pour réinitialiser le décalage d'un sujet spécifique à un décalage spécifique dans le groupe de consommateurs.

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-offset 1000 --topic <my-topic> --execute

Autres arguments soutenus :

--décalage par [nombre entier positif ou négatif] - Décale le décalage vers l'avant ou vers l'arrière par rapport au nombre entier donné.

-aujourd'hui y -à la dernière minute sont les mêmes que --pour compenser y -au plus tôt .

--to-datetime [Le format de l'heure est yyyy-MM-ddTHH:mm:ss.xxx ]

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --to-datetime 2017-08-04T00:00:00.000 [ --all-topics or --topic <topic-name> ] --execute

-par-durée [Le format est PnDTnHnMnS ]

kafka-consumer-groups.sh --bootstrap-server <kafka_broker_host:9091> --group
    <group_name> --reset-offsets --by-duration PT0H10M0S [ --all-topics or --topic <topic-name> ] --execute

Réinitialisation du décalage de la durée par rapport à l'horodatage actuel.

Comment valider ?

Utilisez la commande ci-dessous pour vérifier les décalages actuels/finaux et pour confirmer que la réinitialisation a effectué les décalages.

kafka-consumer-groups.sh --bootstrap-server <kafka_host:port> --group <group_id> --describe

Exemple de sortie :

Consumer group 'group1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
intro           0          0               99              99              -               -               -

Erreurs possibles :

Erreur : Les affectations ne peuvent être réinitialisées que si le groupe '[nom_du_groupe]' est inactif, mais que l'état actuel est stable.

Stable" signifie qu'il existe un consommateur actif pour ce groupe. Vous devez donc d'abord arrêter le ou les consommateurs actifs et réessayer de réinitialiser les décalages.

Il est pas possible de réinitialiser les offsets s'il y a un consommateur actif pour le groupe de consommateurs.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X