113 votes

Existe-t-il un moyen de supprimer toutes les données d'un sujet ou de supprimer le sujet avant chaque exécution ?

Existe-t-il un moyen de supprimer toutes les données d'un sujet ou de supprimer le sujet avant chaque exécution ?

Puis-je modifier le fichier KafkaConfig.scala pour changer l'adresse de l'utilisateur ? logRetentionHours la propriété ? Existe-t-il un moyen de supprimer les messages dès que le consommateur les lit ?

J'utilise des producteurs pour récupérer les données de quelque part et envoyer les données à un sujet particulier où un consommateur consomme, puis-je supprimer toutes les données de ce sujet à chaque exécution ? Je ne veux que des nouvelles données à chaque fois dans le sujet. Y a-t-il un moyen de réinitialiser le sujet d'une manière ou d'une autre ?

84voto

Patrick Points 1473

Comme je l'ai mentionné ici Purger la file d'attente Kafka :

Testé dans Kafka 0.8.2, pour l'exemple de démarrage rapide : Premièrement, ajoutez une ligne au fichier server.properties dans le dossier config :

delete.topic.enable=true

alors, vous pouvez exécuter cette commande :

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

72voto

Hild Points 458

Je ne pense pas qu'il soit encore supporté. Jetez un coup d'œil à ceci Question JIRA "Support pour l'ajout et la suppression de sujets".

Pour supprimer manuellement :

  1. Arrêter le cluster
  2. Nettoyer le répertoire des journaux de kafka (spécifié par l'option log.dir attribut dans kafka config ) ainsi que les données du zookeeper
  3. Redémarrer le cluster

Pour un sujet donné, vous pouvez

  1. Arrêter kafka
  2. Nettoyer le journal de kafka spécifique à la partition, kafka stocke son fichier journal dans un format de "logDir/topic-partition" donc pour un sujet nommé "MonSujet" le journal pour la partition id 0 sera stocké dans /tmp/kafka-logs/MyTopic-0 donde /tmp/kafka-logs est spécifié par l'option log.dir attribut
  3. Redémarrer kafka

Esto es NOT une approche bonne et recommandée mais elle devrait fonctionner. Dans le fichier de configuration du courtier Kafka, l'élément log.retention.hours.per.topic est utilisé pour définir The number of hours to keep a log file before deleting it for some specific topic

En outre, existe-t-il un moyen de supprimer les messages dès que le consommateur les lit ?

Desde el Documentation sur Kafka :

Le cluster Kafka conserve tous les messages publiés, qu'ils aient été consommés ou non, pendant une période de temps configurable. Par exemple, si la rétention du journal est définie sur deux jours, alors pendant les deux jours suivant la publication d'un message, celui-ci est disponible pour être consommé, après quoi il sera écarté pour libérer de l'espace. Les performances de Kafka sont effectivement constantes par rapport à la taille des données, la rétention d'un grand nombre de données n'est donc pas un problème.

En fait, la seule métadonnée conservée pour chaque consommateur est la position du consommateur dans le journal, appelée "offset". Ce décalage est contrôlé par le consommateur : normalement, un consommateur avance son décalage de façon linéaire au fur et à mesure qu'il lit les messages, mais en fait la position est contrôlée par le consommateur et il peut consommer les messages dans l'ordre qu'il souhaite. Par exemple, un consommateur peut revenir à un décalage plus ancien pour retraiter.

Pour trouver le décalage de début de lecture dans Kafka 0.8 Exemple simple de consommateur ils disent

Kafka inclut deux constantes pour vous aider, kafka.api.OffsetRequest.EarliestTime() trouve le début des données dans les journaux et commence la diffusion à partir de là, kafka.api.OffsetRequest.LatestTime() ne diffusera que les nouveaux messages.

Vous pouvez également y trouver le code d'exemple pour gérer le décalage au niveau du consommateur.

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

16voto

Swapnil Shirke Points 575

Testé avec kafka 0.10

1. stop zookeeper & Kafka server,
2. then go to 'kafka-logs' folder , there you will see list of kafka topic folders, delete folder with topic name
3. go to 'zookeeper-data' folder , delete data inside that.
4. start zookeeper & kafka server again.

Note : si vous supprimez un ou plusieurs dossiers de sujets dans kafka-logs mais pas dans le dossier zookeeper-data, vous verrez que les sujets sont toujours là.

10voto

vdlen Points 101

Ci-dessous se trouvent les scripts pour vider et supprimer un sujet Kafka en supposant que localhost est le serveur zookeeper et que Kafka_Home est défini sur le répertoire d'installation :

Le script ci-dessous permet de vide un sujet en fixant son temps de rétention à 1 seconde, puis en supprimant la configuration :

#!/bin/bash
echo "Enter name of topic to empty:"
read topicName
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --add-config retention.ms=1000
sleep 5
/$Kafka_Home/bin/kafka-configs --zookeeper localhost:2181 --alter --entity-type topics --entity-name $topicName --delete-config retention.ms

A suppression totale topics vous devez arrêter le(s) broker(s) kafka applicable(s) et supprimer son (leurs) répertoire(s) du répertoire des logs kafka (par défaut : /tmp/kafka-logs) et ensuite exécuter ce script pour supprimer le topic de zookeeper. Pour vérifier qu'il a été supprimé de zookeeper la sortie de ls /brokers/topics ne devrait plus inclure le sujet :

#!/bin/bash
echo "Enter name of topic to delete from zookeeper:"
read topicName
/$Kafka_Home/bin/zookeeper-shell localhost:2181 <<EOF
rmr /brokers/topics/$topicName
ls /brokers/topics
quit
EOF

9voto

Ivan Balashov Points 321

Comme solution de contournement, vous pouvez ajuster les paramètres de rétention d'exécution par sujet, par exemple bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config retention.bytes=1 ( rétention.octets=0 pourrait également fonctionner)

Après un court moment, Kafka devrait libérer l'espace. Je ne sais pas si cela a des implications par rapport à la recréation du sujet.

ps. Mieux vaut remettre les paramètres de rétention, une fois que Kafka aura fait le ménage.

Vous pouvez également utiliser retention.ms pour conserver les données historiques

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X