2 votes

Kafka : Chaque appel poll() ne doit-il consommer qu'un seul sujet à la fois ?

J'ai un seul consommateur Kafka qui consomme à partir de plusieurs sujets Kafka. J'aimerais pouvoir utiliser une écriture par lot dans ma destination via un appel d'entrée/sortie pour 100 messages, mais pour pouvoir faire un lot, tous les messages doivent provenir du même sujet.

Si j'ai plusieurs sujets (disons 5), et lorsque consumer.poll ou consumer.consume se produit, et que je reçois par exemple 100 messages à chaque sondage, y a-t-il un moyen de s'assurer que ceux-ci proviennent tous du même sujet, de sorte que ces messages puissent être écrits par lots vers la même destination ? De sorte que le prochain appel .poll reçoive le prochain sujet ?

2voto

senseiwu Points 1716

Il n'est pas possible d'interroger par sujet - vous êtes abonné à une liste de sujets et chaque sujet peut avoir plusieurs partitions. Un sondage donné récupère un ConsumerRecords qui est un conteneur de ConsumerRecord . A ConsumerRecord représente une paire KV qui appartient à l'une des partitions de l'un des sujets auxquels vous avez souscrit.

Kafka essaie d'attribuer un TopicPartition aux consommateurs formant un seul groupe sur la base d'un cédant. Si vous n'avez qu'un seul consommateur, il réclamera toutes les partitions de tous les sujets. Rien ne vous empêche alors de procéder à des regroupements au sein de votre code d'application, bien que

par exemple

private void consume() {
    List<String> topics = List.of("topic1", "topic2", "topic3", "topic4", "topic5");
    kafkaConsumer.subscribe(topics);

    while (true) {
        ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);

        topics.forEach(s -> {
            List<ConsumerRecord<String, String>> recordsPerTopicPartition = new ArrayList<>();
            consumerRecords.records(s).forEach(recordsPerTopicPartition::add);
            doWhatever(recordsPerTopicPartition);
        });
    }
}

private void doWhatever(List<ConsumerRecord<String, String>> consumerRecords) {
    //process
}

1voto

Everv0id Points 1027

Une autre façon de traiter les sujets est la suivante : chaque ProducerRecord a une méthode topic() qui renvoie le nom du sujet pour cet enregistrement. Vous pouvez ensuite regrouper par thème et faire ce que vous voulez avec les paires de thèmes et la collection d'enregistrements pour ce thème.

Mais je suggère fortement d'utiliser un KafkaConsumer distinct pour chaque sujet distinct si vous voulez gérer les sujets indépendamment.

0voto

fhussonnois Points 708

Une solution pour s'abonner à plusieurs sujets tout en interrogeant les messages sujet par sujet est d'utiliser les méthodes pause/reprise.

Voici un exemple :

        List<String> subscription = List.of("topic-a", "topic-b");
        consumer.subscribe(suubscription);

        final Map<String, List<TopicPartition>> partitionsPerTopic = 
                consumer.assignment()
                .stream()
                .collect(Collectors.groupingBy(TopicPartition::topic, Collectors.toList()));

        int next = 0;
        consumer.pause(consumer.assignment());
        // Starting consumption
        while (!closed.get()) {
            // Resuming consumption for next topic
            final String topic = subscription.get(next);
            consumer.resume(partitionsPerTopic.get(topic));

            consumer.poll(Duration.ofMillis(500)).forEach( records -> {
            ...
            });   

            // Pausing consumption for current topic
            consumer.pause(partitionsPerTopic.get(topic));
            next = (next + 1) % subscription.size();
       }

Cependant, cette solution pourrait ne pas être efficace car le consommateur peut encore avoir des messages en mémoire tampon pour les partitions récupérables précédentes. Ces messages seront vidés et récupérés à la deuxième itération.

En d'autres termes, cette solution peut augmenter les allers-retours du réseau entre le consommateur et les courtiers.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X