Je ne pense pas qu'il soit encore supporté. Jetez un coup d'œil à ceci Question JIRA "Support pour l'ajout et la suppression de sujets".
Pour supprimer manuellement :
- Arrêter le cluster
- Nettoyer le répertoire des journaux de kafka (spécifié par l'option
log.dir
attribut dans kafka config ) ainsi que les données du zookeeper
- Redémarrer le cluster
Pour un sujet donné, vous pouvez
- Arrêter kafka
- Nettoyer le journal de kafka spécifique à la partition, kafka stocke son fichier journal dans un format de "logDir/topic-partition" donc pour un sujet nommé "MonSujet" le journal pour la partition id 0 sera stocké dans
/tmp/kafka-logs/MyTopic-0
donde /tmp/kafka-logs
est spécifié par l'option log.dir
attribut
- Redémarrer kafka
Esto es NOT
une approche bonne et recommandée mais elle devrait fonctionner. Dans le fichier de configuration du courtier Kafka, l'élément log.retention.hours.per.topic
est utilisé pour définir The number of hours to keep a log file before deleting it for some specific topic
En outre, existe-t-il un moyen de supprimer les messages dès que le consommateur les lit ?
Desde el Documentation sur Kafka :
Le cluster Kafka conserve tous les messages publiés, qu'ils aient été consommés ou non, pendant une période de temps configurable. Par exemple, si la rétention du journal est définie sur deux jours, alors pendant les deux jours suivant la publication d'un message, celui-ci est disponible pour être consommé, après quoi il sera écarté pour libérer de l'espace. Les performances de Kafka sont effectivement constantes par rapport à la taille des données, la rétention d'un grand nombre de données n'est donc pas un problème.
En fait, la seule métadonnée conservée pour chaque consommateur est la position du consommateur dans le journal, appelée "offset". Ce décalage est contrôlé par le consommateur : normalement, un consommateur avance son décalage de façon linéaire au fur et à mesure qu'il lit les messages, mais en fait la position est contrôlée par le consommateur et il peut consommer les messages dans l'ordre qu'il souhaite. Par exemple, un consommateur peut revenir à un décalage plus ancien pour retraiter.
Pour trouver le décalage de début de lecture dans Kafka 0.8 Exemple simple de consommateur ils disent
Kafka inclut deux constantes pour vous aider, kafka.api.OffsetRequest.EarliestTime()
trouve le début des données dans les journaux et commence la diffusion à partir de là, kafka.api.OffsetRequest.LatestTime()
ne diffusera que les nouveaux messages.
Vous pouvez également y trouver le code d'exemple pour gérer le décalage au niveau du consommateur.
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}