J'essaie d'utiliser Spring Cloud Stream pour intégrer Kafka. Le message écrit est un POJO Java et, bien que cela fonctionne comme prévu (le message est écrit dans le sujet et je peux le lire avec une application consommatrice), certains caractères inconnus sont ajoutés au début du message, ce qui pose problème lorsque j'essaie d'intégrer Kafka Connect pour recevoir les messages du sujet.
Avec la configuration par défaut, c'est le message qui est poussé vers Kafka :
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}
Si je configure le producteur Kafka dans l'application Java, le message est écrit dans le sujet sans les caractères de tête / en-têtes :
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, Object>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Message sur Kafka :
{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}
Étant donné que je ne fais que définir les sérialiseurs clé/valeur, je m'attendais à pouvoir le faire dans l'interface de l'utilisateur. application.yml
plutôt que de le faire dans le code. Cependant, lorsque le fichier yml est mis à jour pour spécifier les sérialiseurs, il ne fonctionne pas comme je l'attendais, c'est-à-dire qu'il ne génère pas le même message que le producteur configuré en Java (ci-dessus) :
spring:
profiles: local
cloud:
stream:
bindings:
session:
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
Message sur Kafka :
"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"
Devrait-il être possible de le configurer uniquement par le biais du yml de l'application ? Y a-t-il des paramètres supplémentaires qui manquent ?