2 votes

IllegalStateException Les abonnements aux thèmes, aux partitions et aux modèles sont mutuellement exclusifs.

Besoin de récupérer des messages d'un sujet Kafka, à partir d'un décalage particulier

Cause du blocage de l'exception IllegalStateException à l'adresse assign()

Si je n'utilise pas assign() le consommateur n'effectue pas de recherche, car il s'agit d'une opération paresseuse.

Objectif réel : Besoin d'itérer les messages sur le sujet à partir d'un décalage prédéfini jusqu'à la fin. Ce décalage prédéfini est calculé à markOffset()

static void fetchMessagesFromMarkedOffset() {
    Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
    consumer.assign(set); // <---- Exception at this place
    map.forEach((k,v) -> {
        consumer.seek(k, v-3);
    });
    ConsumerRecords<Long, String> consumerRecords = consumer.poll(100);
    consumerRecords.forEach(record -> {
        System.out.println("Record Key " + record.key());
        System.out.println("Record value " + record.value());
        System.out.println("Record partition " + record.partition());
        System.out.println("Record offset " + record.offset());
    });
    consumer.close();
}

Reste du code concerné

public static Set<TopicPartition> set;
public static Map<TopicPartition, Long> map;

static void markOffset() {
    Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
    consumer.poll(100);
    set = consumer.assignment();
    map = consumer.endOffsets(set);
    System.out.println("Topic  Partitions: " + set);
    System.out.println("End Offsets: " + map);
}

Création de consommateurs

private Consumer createConsumer(String topicName) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "capacity-service-application");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    final Consumer consumer = new KafkaConsumer(props);
    consumer.subscribe(Collections.singletonList(topicName));
    return consumer;
}   

Exception

Exception in thread "main" java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
at org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:104)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromUser(SubscriptionState.java:157)
at org.apache.kafka.clients.consumer.KafkaConsumer.assign(KafkaConsumer.java:1064)
at com.gaurav.kafka.App.fetchMessagesFromMarkedOffset(App.java:44)
at com.gaurav.kafka.App.main(App.java:30)

4voto

Vous ne pouvez pas mélanger manual y automatic l'affectation de la partition. Vous devez utiliser KafkaConsumer::subscribe o KafkaConsumer::assign mais pas les deux.

Si, après avoir appelé KafkaConsumer::subscribe vous souhaitez passer à manual vous devez d'abord appeler KafkaConsumer::unsubscribe .

Selon le https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Notez qu'il n'est pas possible de combiner l'attribution manuelle de partitions (c'est-à-dire en utilisant assign) avec l'attribution dynamique de partitions par l'intermédiaire de l'abonnement aux thèmes (c'est-à-dire en utilisant subscribe).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X