262 votes

Y a-t-il un moyen de purger le sujet dans Kafka ?

J'ai poussé un message qui était trop gros dans un sujet de message kafka sur ma machine locale, maintenant je reçois une erreur :

kafka.common.InvalidMessageSizeException: invalid message size

Augmenter le fetch.size n'est pas l'idéal ici, car je ne veux pas vraiment accepter des messages de cette taille.

5voto

Ben Coughlan Points 515

Parfois, si vous avez un cluster saturé (trop de partitions, ou si vous utilisez des données de sujet cryptées, ou si vous utilisez SSL, ou si le contrôleur est sur un mauvais nœud, ou si la connexion est faible, cela prendra beaucoup de temps pour purger le sujet.

Je suis ces étapes, surtout si vous utilisez Avro.

1 : Exécuter avec les outils kafka :

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2 : Run :

kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3 : Remettez la rétention du sujet au réglage original, une fois que le sujet est vide.

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

J'espère que cela aidera quelqu'un, car ce n'est pas facile à annoncer.

5voto

Vladimir Semashkin Points 1084

Il y a beaucoup de bonnes réponses ici, mais parmi elles, je n'en ai pas trouvé une sur Docker. J'ai passé du temps à comprendre que l'utilisation du conteneur broker n'est pas la bonne dans ce cas (évidemment ! !!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
        at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
        at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
        at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
        at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
        at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
        at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
        at kafka.admin.TopicCommand.main(TopicCommand.scala)

et j'aurais dû utiliser zookeeper:2181 au lieu de --zookeeper localhost:2181 selon mon fichier de composition

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

la commande correcte serait

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

J'espère que cela fera gagner du temps à quelqu'un.

Sachez également que les messages ne seront pas supprimés immédiatement et que cela se produira lorsque le segment du journal sera fermé.

4voto

Mark Butler Points 1463

Le conseil de Thomas est excellent mais malheureusement zkCli dans les anciennes versions de Zookeeper (par exemple 3.3.6) ne semble pas supporter rmr . Par exemple, comparez l'implémentation en ligne de commande dans Zookeeper moderne avec version 3.3 .

Si vous êtes confronté à une ancienne version de Zookeeper, une solution consiste à utiliser une bibliothèque cliente telle que zc.zk pour Python. Pour les personnes qui ne sont pas familières avec Python, vous devez l'installer à l'aide des outils suivants pip ou facile_installation . Ensuite, lancez un shell Python ( python ) et vous pouvez le faire :

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic') 

ou même

zk.delete_recursive('brokers')

si vous voulez supprimer tous les sujets de Kafka.

4voto

Andrew Carter Points 49

L'approche la plus simple consiste à fixer la date des fichiers journaux individuels à une date antérieure à la période de conservation. Le courtier devrait alors les nettoyer et les supprimer pour vous en quelques secondes. Cette méthode présente plusieurs avantages :

  1. Pas besoin de faire descendre les courtiers, c'est une opération d'exécution.
  2. Évite la possibilité d'exceptions de décalage invalide (voir ci-dessous).

D'après mon expérience avec Kafka 0.7.x, la suppression des fichiers journaux et le redémarrage du courtier peuvent entraîner des exceptions de décalage invalide pour certains consommateurs. Cela se produirait parce que le courtier redémarre les décalages à zéro (en l'absence de tout fichier journal existant), et un consommateur qui consommait précédemment à partir du sujet se reconnecterait pour demander un décalage spécifique [une fois valide]. Si ce décalage se trouve être en dehors des limites des nouveaux journaux du sujet, il n'y a pas de problème et le consommateur reprend au début ou à la fin. Mais, si le décalage tombe dans les limites des journaux du nouveau sujet, le courtier tente de récupérer l'ensemble des messages mais échoue parce que le décalage ne s'aligne pas sur un message réel.

Cela pourrait être atténué en effaçant également les offsets des consommateurs dans zookeeper pour ce sujet. Mais si vous n'avez pas besoin d'un sujet vierge et que vous voulez juste supprimer le contenu existant, alors simplement "toucher" quelques journaux de sujets est beaucoup plus facile et plus fiable, que d'arrêter les courtiers, de supprimer les journaux de sujets, et de vider certains noeuds de zookeeper.

4voto

user469718 Points 35

Outre la mise à jour de retention.ms et retention.bytes, j'ai remarqué que la politique de nettoyage des sujets devrait être "delete" (par défaut), si elle est "compact", elle va conserver les messages plus longtemps, c'est-à-dire que si elle est "compact", il faut spécifier supprimer.rétention.ms également.

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

Il a également fallu surveiller les décalages les plus anciens et les plus récents pour confirmer que cela s'est bien produit, vous pouvez également vérifier le du -h /tmp/kafka-logs/test-topic-3-100-*.

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

L'autre problème est que vous devez obtenir la configuration actuelle. premièrement pour que vous vous rappeliez de revenir en arrière une fois la suppression réussie : ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X