El @KafkaListener
nécessite KafkaListenerContainerFactory
@Bean
qui, à son tour, se fonde sur le modèle de l ConsumerFactory
. Et le DefaultKafkaConsumerFactory
accepte un Map<String, Object>
de configurations de consommateurs :
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
...
return props;
}
}
https://docs.spring.io/spring-kafka/docs/1.2.2.RELEASE/reference/html/_reference.html#__kafkalistener_annotation
Où cela ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
est exactement l'Apache Kafka standard bootstrap.servers
propriété :
Une liste de paires hôte/port à utiliser pour établir la connexion initiale au cluster Kafka. Le client utilisera tous les serveurs, quels que soient les serveurs spécifiés ici pour l'amorçage. Cette liste n'a d'impact que sur les hôtes initiaux utilisés pour découvrir l'ensemble des serveurs. Cette liste doit être de la forme host1:port1,host2:port2,..... Puisque ces serveurs ne sont utilisés que pour la connexion initiale afin de découvrir l'ensemble des membres de la grappe (qui peut changer dynamiquement), cette liste ne doit pas nécessairement contenir l'ensemble des serveurs (vous pouvez en vouloir plus d'un, cependant, au cas où un serveur serait hors service).
Non, vous ne pouvez pas indiquer l'adresse de Zookeeper. Ce n'est plus supporté par Kafka.