3 votes

Sérialisation JSON du producteur Kafka

J'essaie d'utiliser Spring Cloud Stream pour intégrer Kafka. Le message écrit est un POJO Java et, bien que cela fonctionne comme prévu (le message est écrit dans le sujet et je peux le lire avec une application consommatrice), certains caractères inconnus sont ajoutés au début du message, ce qui pose problème lorsque j'essaie d'intégrer Kafka Connect pour recevoir les messages du sujet.

Avec la configuration par défaut, c'est le message qui est poussé vers Kafka :

     contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}

Si je configure le producteur Kafka dans l'application Java, le message est écrit dans le sujet sans les caractères de tête / en-têtes :

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
        configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);

        return new DefaultKafkaProducerFactory<String, Object>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Message sur Kafka :

{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}

Étant donné que je ne fais que définir les sérialiseurs clé/valeur, je m'attendais à pouvoir le faire dans l'interface de l'utilisateur. application.yml plutôt que de le faire dans le code. Cependant, lorsque le fichier yml est mis à jour pour spécifier les sérialiseurs, il ne fonctionne pas comme je l'attendais, c'est-à-dire qu'il ne génère pas le même message que le producteur configuré en Java (ci-dessus) :

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

Message sur Kafka :

"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"

Devrait-il être possible de le configurer uniquement par le biais du yml de l'application ? Y a-t-il des paramètres supplémentaires qui manquent ?

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X