J'utilise le client Confluent.Kafka .NET version 1.3.0. Je voudrais commencer à consommer des messages à partir d'un moment donné.
Pour ce faire, je pourrais utiliser OffsetsForTimes
pour obtenir le décalage souhaité et Commit
ce décalage pour cette partition :
private void SetOffset()
{
const string Topic = "myTopic";
const string BootstrapServers = "server1, server2";
var adminClient = new AdminClientBuilder(
new Dictionary<string, string>
{
{ "bootstrap.servers", BootstrapServers },
{ "security.protocol", "sasl_plaintext" },
{ "sasl.mechanisms", "PLAIN" },
{ "sasl.username", this.kafkaUsername },
{ "sasl.password", this.kafkaPassword }
}).Build();
var consumer = new ConsumerBuilder<byte[], byte[]>(
new Dictionary<string, string>
{
{ "bootstrap.servers", BootstrapServers },
{ "group.id", this.groupId },
{ "enable.auto.commit", "false" },
{ "security.protocol", "sasl_plaintext" },
{ "sasl.mechanisms", "PLAIN" },
{ "sasl.username", this.kafkaUsername },
{ "sasl.password", this.kafkaPassword }
}).Build();
// Timestamp to which the offset should be set to
var timeStamp = new DateTime(2020, 3, 1, 0, 0, 0, DateTimeKind.Utc);
var newOffsets = new List<TopicPartitionOffset>();
var metadata = adminClient.GetMetadata(Topic, TimeSpan.FromSeconds(30));
foreach (var topicMetadata in metadata.Topics)
{
if (topicMetadata.Topic == Topic)
{
foreach (var partitionMetadata in topicMetadata.Partitions.OrderBy(p => p.PartitionId))
{
var topicPartition = new TopicPartition(topicMetadata.Topic, partitionMetadata.PartitionId);
IEnumerable<TopicPartitionOffset> found = consumer.OffsetsForTimes(
new[] { new TopicPartitionTimestamp(topicPartition, new Timestamp(timeStamp, TimestampType.CreateTime)) },
TimeSpan.FromSeconds(5));
newOffsets.Add(new TopicPartitionOffset(topicPartition, new Offset(found.First().Offset)));
}
}
}
consumer.Commit(newOffsets);
// Consume messages
consumer.Subscribe(Topic);
var consumerResult = consumer.Consume();
// process message
//consumer.Commit(consumerResult);
}
Cela fonctionne bien si je veux sauter et sauter à un décalage donné si le décalage auquel je souhaite sauter est après le dernier message validé.
Cependant, l'approche ci-dessus ne fonctionnera pas si l'horodatage donné est avant l'horodatage du dernier message validé. Dans le code ci-dessus, si le timeStamp
est antérieure à l'horodatage du dernier message validé, alors OffsetsForTimes
retournera le décalage du dernier message validé + 1. Même si je règle manuellement le décalage sur un décalage inférieur, alors consumer.Commit(newOffsets)
semble n'avoir aucun effet et je reçois le premier message non engagé lors de la consommation.
Y a-t-il un moyen d'y parvenir à partir du code ?