3 votes

Re-consommation des messages Kafka à partir d'un moment donné

J'utilise le client Confluent.Kafka .NET version 1.3.0. Je voudrais commencer à consommer des messages à partir d'un moment donné.

Pour ce faire, je pourrais utiliser OffsetsForTimes pour obtenir le décalage souhaité et Commit ce décalage pour cette partition :

private void SetOffset()
{
    const string Topic = "myTopic";
    const string BootstrapServers = "server1, server2";

    var adminClient = new AdminClientBuilder(
        new Dictionary<string, string>
        {
            { "bootstrap.servers", BootstrapServers },
            { "security.protocol", "sasl_plaintext" },
            { "sasl.mechanisms", "PLAIN" },
            { "sasl.username", this.kafkaUsername },
            { "sasl.password", this.kafkaPassword }
        }).Build();

    var consumer = new ConsumerBuilder<byte[], byte[]>(
        new Dictionary<string, string>
        {
            { "bootstrap.servers", BootstrapServers },
            { "group.id", this.groupId },
            { "enable.auto.commit", "false" },
            { "security.protocol", "sasl_plaintext" },
            { "sasl.mechanisms", "PLAIN" },
            { "sasl.username", this.kafkaUsername },
            { "sasl.password", this.kafkaPassword }
        }).Build();

    // Timestamp to which the offset should be set to
    var timeStamp = new DateTime(2020, 3, 1, 0, 0, 0, DateTimeKind.Utc);

    var newOffsets = new List<TopicPartitionOffset>();
    var metadata = adminClient.GetMetadata(Topic, TimeSpan.FromSeconds(30));
    foreach (var topicMetadata in metadata.Topics)
    {
        if (topicMetadata.Topic == Topic)
        {
            foreach (var partitionMetadata in topicMetadata.Partitions.OrderBy(p => p.PartitionId))
            {
                var topicPartition = new TopicPartition(topicMetadata.Topic, partitionMetadata.PartitionId);

                IEnumerable<TopicPartitionOffset> found = consumer.OffsetsForTimes(
                    new[] { new TopicPartitionTimestamp(topicPartition, new Timestamp(timeStamp, TimestampType.CreateTime)) },
                    TimeSpan.FromSeconds(5));

                newOffsets.Add(new TopicPartitionOffset(topicPartition, new Offset(found.First().Offset)));
            }
        }
    }

    consumer.Commit(newOffsets);

    // Consume messages
    consumer.Subscribe(Topic);
    var consumerResult = consumer.Consume();
    // process message
    //consumer.Commit(consumerResult);
}

Cela fonctionne bien si je veux sauter et sauter à un décalage donné si le décalage auquel je souhaite sauter est après le dernier message validé.

Cependant, l'approche ci-dessus ne fonctionnera pas si l'horodatage donné est avant l'horodatage du dernier message validé. Dans le code ci-dessus, si le timeStamp est antérieure à l'horodatage du dernier message validé, alors OffsetsForTimes retournera le décalage du dernier message validé + 1. Même si je règle manuellement le décalage sur un décalage inférieur, alors consumer.Commit(newOffsets) semble n'avoir aucun effet et je reçois le premier message non engagé lors de la consommation.

Y a-t-il un moyen d'y parvenir à partir du code ?

0voto

Ángel Igualada Points 819

Je ne suis pas un expert mais je vais essayer de vous expliquer comment vous pourriez le faire.

En premier lieu, nous devons mentionner les méthodes subscribe et assign.

Lorsque vous utilisez l'abonnement, vous passez un ou plusieurs sujets. Ainsi, une liste de partitions de chaque sujet est attribuée au consommateur en fonction du nombre de consommateurs dans son groupe. Une partition de sujet est un objet formé par le nom du sujet et le numéro de la partition.

consumer.Subscribe(Topic);

Vous pouvez utiliser assign pour passer les partitions dont le consommateur va lire. Cette méthode n'utilise pas la fonctionnalité de gestion des groupes du consommateur (où pas besoin de group.id) Si je ne me trompe pas, dans la méthode assign vous pouvez spécifier le décalage initial.

consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
consumer.Assign(topicPartition, new Offset(lastConsumedOffset));

Une autre option consiste à utiliser la méthode seek() pour définir le décalage.

consumer.Seek(topicPartitionOffset);

Si vous souhaitez combiner l'abonnement et l'affectation, n'oubliez pas que vous devez utiliser l'option de désabonnement avant.

Une autre option, si vous souhaitez reprendre tous les messages, est de créer un fichier consommateur dans un nouveau groupe de consommateurs différent.

EXEMPLE (À REVOIR)

Je vous laisse un exemple pour le moment, je le vérifierai plus tard. J'ai fait l'exemple en Java parce que je suis plus familier avec lui. Dans cet exemple, je n'utilise pas subscribe, mais assign. D'abord, les partitions du sujet sont récupérées, nous fixons une date de début pour lire les messages, nous créons une carte spécifiant cette date pour chaque partition.

Avec la carte créée, nous obtenons le décalage de chaque partition à la date spécifiée avec la méthode offsetsForTimes. Avec le décalage de chaque partition, nous utilisons seek pour nous déplacer vers ce décalage sur chaque partition et enfin nous consommons les messages.

Je n'ai pas le temps maintenant de vérifier le code, mais je vais le faire. J'espère que cela vous aidera.

        AdminClient client = AdminClient.create(getAdminClientProperties());
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(
                getConsumerProperties());

        String TOPIC = "topic";

        // get info of all partitions of a topic
        List<PartitionInfo> partitionsInfo = consumer.partitionsFor(TOPIC);

        // create TopicPartition list
        Set<TopicPartition> partitions = new HashSet<>();
        for (PartitionInfo p : partitionsInfo) {
            partitions.add(new TopicPartition(p.topic(), p.partition()));
        }

        // Consumer will read from all partitions
        consumer.assign(partitions);
        DateTime timeToStartReadMessagesFrom = new DateTime(2020, 3, 1, 0, 0, 0);

        Map<TopicPartition, Long> timestamps = new HashMap<>();
        for (TopicPartition tp : partitions) {
            timestamps.put(tp, timeToStartReadMessagesFrom.getMillis());
        }
        // get the offset for that time in each partition
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
        for (TopicPartition tp : partitions) {
            consumer.seek(tp, offsets.get(tp).offset());
        }

        while (true) {
            final ConsumerRecords<String, GenericRecord> consumerRecords = consumer.poll(1000);
            // do something
            break;
        }
        consumer.close();
        System.out.println("DONE");

0voto

Michael Points 405

Vous pouvez le faire si vous attribuez à chaque partition et état le décalage à partir duquel commencer la lecture.

c'est ainsi que vous obtenez la liste des partitions du sujet :

public static List<TopicPartition> GetTopicPartitions(string bootstrapServers, string topicValue) {
    var tp = new List<TopicPartition>();
    using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
        var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
        meta.Topics.ForEach(topic => {
            if (topic.Topic == topicValue) {
                foreach (PartitionMetadata partition in topic.Partitions) {
                    tp.Add(new TopicPartition(topic.Topic, partition.PartitionId));
                }
            }
        });
    }
    return tp;
}

C'est ainsi que l'on trouve le décalage d'un temps particulier :

List<TopicPartition> topic_partitions = frmMain.GetTopicPartitions(mBootstrapServers, txtTopic.Text);

using (var consumer = new ConsumerBuilder<Ignore, string>(cfg).Build()) {
    consumer.Assign(topic_partitions);

    List<TopicPartitionTimestamp> new_times = new List<TopicPartitionTimestamp>();
    foreach (TopicPartition tp in topic_partitions) {
        new_times.Add(new TopicPartitionTimestamp(tp, new Timestamp(dtpNewTime.Value)));
    }

    List<TopicPartitionOffset> seeked_offsets = consumer.OffsetsForTimes(new_times, TimeSpan.FromSeconds(40));
    string s = "";
    foreach (TopicPartitionOffset tpo in seeked_offsets) {
        s += $"{tpo.TopicPartition}: {tpo.Offset.Value}\n";
    }
    Console.WriteLine(s);
    consumer.Close();
}

C'est ainsi que l'on consomme en attribuant à tous les thèmes la partition et les décalages spécifiques :

using (var consumer =
    new ConsumerBuilder<string, string>(config)
        .SetErrorHandler((_, e) => Log($"Error: {e.Reason}"))
        .Build()) {
    consumer.Assign(seeked_offsets);

    try {
        while (true) {
            try {
                var r = consumer.Consume(cancellationToken);
                // do something with r
            } catch (ConsumeException e) {
                //Log($"Consume error: {e.Error.Reason}");
            }
        }
    } catch (OperationCanceledException) {
        //Log("Closing consumer.");
        consumer.Close();
    }
}

L'autre option, si vous insistez pour appliquer cela au groupe de consommateurs, serait de réinitialiser le groupe de consommateurs et d'utiliser votre code, ou de créer un nouveau groupe de consommateurs.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X