2 votes

L'entrée kafka de Logstash 5.1.1 ne récupère pas les messages existants sur le sujet

J'ai la configuration suivante de logstash avec une entrée kafka

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["mytopic"]
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
    doc_as_upsert => true
    action => "update"
  }
}

Le problème auquel je suis confronté est que lorsque je lance le logstash, il ne récupère pas les anciens messages sur ce sujet. J'avais l'impression que la première fois que logstash s'exécute, il récupère tous les messages sur un sujet qui n'ont pas été consommés. J'ai vérifié qu'il s'agissait d'un nouveau sujet et qu'il contenait des messages qui n'ont pas été récupérés par logstash lors de son lancement. Il récupère les messages qui arrivent sur le sujet pendant qu'il est en cours d'exécution, mais pas ceux qui existaient avant son démarrage. Est-ce que je manque quelque chose dans la configuration ou est-ce une bizarrerie de l'entrée elle-même ? La garantie des messages est de la plus haute importance pour les besoins de mon entreprise.

6voto

oh54 Points 328

Puisque vous n'avez pas spécifié d'identifiant de groupe pour kafka, les considérations importantes sont les suivantes :

  • Kafka group.id (group_id dans la configuration kafka de logstash) est défini par défaut pour logstash, c'est-à-dire "logstash".
  • La valeur par défaut de Kafka pour enable.auto.commit (enable_auto_commit) dans logstash est "true".
  • Kafka auto.offset.reset (auto_offset_reset) n'a pas de valeur par défaut dans logstash donc je suppose que la valeur par défaut de Kafka de latest est utilisée.

Ainsi, lorsque vous exécutez le consommateur sur un sujet et qu'il ne parvient pas à récupérer les messages déjà présents dans le sujet, l'une des deux choses suivantes se produit probablement :

  1. Il n'y a pas de groupe existant avec le même id de groupe que le consommateur et donc la valeur par défaut de Kafka auto.offset.reset de latest est utilisée et le consommateur ignorera les messages déjà existants.
  2. Il existe un groupe avec le même identifiant de groupe ("logstash") et un consommateur avec cet identifiant de groupe a déjà consommé les messages existants et engagé les offsets (cet autre consommateur peut être celui que vous avez lancé précédemment ou d'autres consommateurs avec le même identifiant de groupe). Cela signifie que les autres consommateurs de ce groupe ne consommeront pas à nouveau ces messages, sauf si on leur demande explicitement de le faire.

Donc, ce que vous voulez probablement faire est de définir une certaine configration Kafka, pour logstash vous devriez être en mesure de définir

group_id => "some_random_group" (un groupe aléatoire)

auto_offset_reset => "earliest" (plus tôt)

Si vous exécutez le consommateur maintenant, puisqu'il n'y a pas d'offsets existants pour some_random_group et que la réinitialisation est la plus précoce, le consommateur devrait consommer tous les messages existants dans un sujet et valider les offsets. Cela signifie que si vous exécutez à nouveau le consommateur après avoir consommé tous les messages, il ne consommera pas les messages existants.

-1voto

Luciano Afranllie Points 1840

Vous devez définir le paramètre du plugin d'entrée kafka auto_offset_reset à "au plus tôt".

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    auto_offset_reset => "earliest"
    topics => ["mytopic"]
  }
}

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X