262 votes

Y a-t-il un moyen de purger le sujet dans Kafka ?

J'ai poussé un message qui était trop gros dans un sujet de message kafka sur ma machine locale, maintenant je reçois une erreur :

kafka.common.InvalidMessageSizeException: invalid message size

Augmenter le fetch.size n'est pas l'idéal ici, car je ne veux pas vraiment accepter des messages de cette taille.

455voto

steven appleyard Points 4544

Mettez temporairement à jour le temps de rétention du sujet à une seconde :

kafka-topics.sh \
  --zookeeper <zkhost>:2181 \
  --alter \
  --topic <topic name> \
  --config retention.ms=1000

Et dans les nouvelles versions de Kafka, vous pouvez également le faire avec kafka-configs --entity-type topics

kafka-configs.sh \
  --zookeeper <zkhost>:2181 \
  --entity-type topics \
  --alter \
  --entity-name <topic name> \
  --add-config retention.ms=1000

puis attendez que la purge prenne effet (la durée dépend de la taille du sujet). Une fois la purge effectuée, restaurez le précédent retention.ms valeur.

113voto

rjaiswal Points 1279

Pour purger la file d'attente, vous pouvez supprimer le sujet :

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

puis le recréer :

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 --partitions 1 --topic test

60voto

Shane Perry Points 720

Bien que la réponse acceptée soit correcte, cette méthode a été dépréciée. La configuration des sujets doit désormais se faire via kafka-configs .

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

Les configurations définies par cette méthode peuvent être affichées avec la commande

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

52voto

Thomas Bratt Points 10738

Voici les étapes à suivre pour supprimer un sujet nommé MyTopic :

  1. Décrivez le sujet et notez les identifiants des courtiers.
  2. Arrêtez le démon Apache Kafka pour chaque ID de courtier répertorié.
  3. Connectez-vous à chaque courtier (à partir de l'étape 1), et supprimez le dossier de données du sujet, par ex. rm -rf /tmp/kafka-logs/MyTopic-0 . Répétez l'opération pour les autres partitions, et pour toutes les répliques.
  4. Supprimez les métadonnées du sujet : zkCli.sh puis rmr /brokers/MyTopic
  5. Démarrer le démon Apache Kafka pour chaque machine arrêtée

Si vous oubliez l'étape 3, Apache Kafka continuera à signaler que le sujet est présent (par exemple, si vous exécutez la commande kafka-list-topic.sh ).

Testé avec Apache Kafka 0.8.0.

47voto

Patrick Points 1473

Testé dans Kafka 0.8.2, pour l'exemple de démarrage rapide : Premièrement, ajoutez une ligne au fichier server.properties dans le dossier config :

delete.topic.enable=true

alors, vous pouvez exécuter cette commande :

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

Puis le recréer, pour que les clients puissent poursuivre leurs opérations sur un sujet vide.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X