113 votes

Existe-t-il un moyen de supprimer toutes les données d'un sujet ou de supprimer le sujet avant chaque exécution ?

Existe-t-il un moyen de supprimer toutes les données d'un sujet ou de supprimer le sujet avant chaque exécution ?

Puis-je modifier le fichier KafkaConfig.scala pour changer l'adresse de l'utilisateur ? logRetentionHours la propriété ? Existe-t-il un moyen de supprimer les messages dès que le consommateur les lit ?

J'utilise des producteurs pour récupérer les données de quelque part et envoyer les données à un sujet particulier où un consommateur consomme, puis-je supprimer toutes les données de ce sujet à chaque exécution ? Je ne veux que des nouvelles données à chaque fois dans le sujet. Y a-t-il un moyen de réinitialiser le sujet d'une manière ou d'une autre ?

1voto

ForeverLearner Points 675

J'utilise l'utilitaire ci-dessous pour nettoyer après l'exécution de mon test d'intégration.

Il utilise les dernières AdminZkClient api. L'ancienne api a été dépréciée.

import javax.inject.Inject
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.utils.Time

class ZookeeperUtils @Inject() (config: AppConfig) {

  val testTopic = "users_1"

  val zkHost = config.KafkaConfig.zkHost
  val sessionTimeoutMs = 10 * 1000
  val connectionTimeoutMs = 60 * 1000
  val isSecure = false
  val maxInFlightRequests = 10
  val time: Time = Time.SYSTEM

  def cleanupTopic(config: AppConfig) = {

    val zkClient = KafkaZkClient.apply(zkHost, isSecure, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time)
    val zkUtils = new AdminZkClient(zkClient)

    val pp = new Properties()
    pp.setProperty("delete.retention.ms", "10")
    pp.setProperty("file.delete.delay.ms", "1000")
    zkUtils.changeTopicConfig(testTopic , pp)
    //    zkUtils.deleteTopic(testTopic)

    println("Waiting for topic to be purged. Then reset to retain records for the run")
    Thread.sleep(60000L)

    val resetProps = new Properties()
    resetProps.setProperty("delete.retention.ms", "3000000")
    resetProps.setProperty("file.delete.delay.ms", "4000000")
    zkUtils.changeTopicConfig(testTopic , resetProps)

  }

}

Il existe une option permettant de supprimer le sujet. Mais, elle marque le sujet pour la suppression. Zookeeper supprime ensuite le sujet. Comme cela peut être d'une longueur imprévisible, je préfère l'approche de retention.ms

1voto

yogender Points 149

Faire :

cd /path/to/kafkaInstallation/kafka-server
bin/kafka-topics.sh  --bootstrap-server  localhost:9092 --delete --topic name_of_kafka_topic

alors vous pouvez le recréer en utilisant :

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic name_of_kafka_topic

0voto

En supprimant manuellement un sujet d'un cluster kafka, vous pouvez vérifier ceci https://github.com/darrenfu/bigdata/issues/6 Une étape vitale qui est souvent omise dans la plupart des solutions consiste à supprimer la /config/topics/<topic_name> dans ZK.

0voto

J'utilise ce script :

#!/bin/bash
topics=`kafka-topics --list --zookeeper zookeeper:2181`
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --config ${p}=100
    done
done
sleep 60
for t in $topics; do 
    for p in retention.ms retention.bytes segment.ms segment.bytes; do
        kafka-topics --zookeeper zookeeper:2181 --alter --topic $t --delete-config ${p}
    done
done

0voto

shubham gulati Points 91

Il existe deux solutions pour nettoyer les données des sujets

  1. Changez le chemin zookeeper dataDir "dataDir=/dataPath" par une autre valeur valeur, supprimer le dossier kafka logs et redémarrer zookeeper et kafka serveur

  2. Exécuter zkCleanup.sh depuis le serveur zookeeper

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X