15 votes

KafkaConsumer 0.10 Java API message d'erreur : Aucune attribution actuelle pour la partition.

Je suis en train d'utiliser l'API Java KafkaConsumer 0.10. Je veux consommer à partir d'une partition spécifique et d'un décalage spécifique. J'ai cherché et j'ai trouvé qu'il existe une méthode seek mais elle lance une exception. Est-ce que quelqu'un a eu un cas d'utilisation similaire ou une solution ?

Code :

KafkaConsumer consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);

Exception

java.lang.IllegalStateException: Pas d'attribution actuelle pour la partition mytopic-1
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
    at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
    at xx.xxx.xxx.Test.main(Test.java:182)

31voto

Matthias J. Sax Points 25989

Avant de pouvoir seek(), vous devez d'abord vous subscribe() à un sujet ou assign() une partition d'un sujet au consommateur. Gardez également à l'esprit que subscribe() et assign() sont paresseux -- donc, vous devez également faire un "appel factice" à poll() avant de pouvoir utiliser seek().

Remarque : à partir de Kafka 2.0, le nouveau poll(Duration timeout) est asynchrone et il n'est pas garanti que vous ayez une attribution complète lorsque poll retourne. Ainsi, vous pourriez avoir besoin de vérifier votre attribution avant d'utiliser seek() et également de poll à nouveau pour rafraîchir l'attribution. (Voir KIP-266 pour plus de détails)

Si vous utilisez subscribe(), vous utilisez la gestion de groupe : ainsi, vous pouvez démarrer plusieurs consommateurs en utilisant le même group.id et toutes les partitions du sujet seront attribuées de manière égale à tous les consommateurs au sein du groupe automatiquement (chaque partition sera attribuée à un seul consommateur dans le groupe).

Si vous souhaitez lire des partitions spécifiques, vous devez utiliser l'attribution manuelle via assign(). Cela vous permet de faire n'importe quelle attribution que vous souhaitez.

À propos : KafkaConsumer a une documentation de classe Java très longue et détaillée incluant des exemples. Il vaut la peine de la lire.

1voto

Adam111p Points 1553

Si vous ne voulez pas utiliser poll() pour récupérer les enregistrements de la carte et modifier le décalage lui-même. Version Kafka 0.11 Essayez ceci :

...
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");    
KafkaConsumer consumer = new KafkaConsumer<>(props);    
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2"));
List partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList());
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator"); 
coordinatorField.setAccessible(true);    

ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
coordinator.poll(new Date().getTime(), 1000);//Faites attention à vos paramètres de date et d'heure locaux
consumer.seekToBeginning(partitions); //ou autre décalage

Scruter les événements du coordinateur. Cela garantit que le coordinateur est connu et que le consommateur a rejoint le groupe (s'il utilise la gestion de groupe). Cela gère également les validations périodiques des décalages s'ils sont activés.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X