5 votes

Comment faire des tests fonctionnels pour les flux Kafka avec Avro (schemaRegistry) ?

  • Une brève explication de ce que je souhaite réaliser : Je veux faire des tests fonctionnels pour une topologie de flux kafka (en utilisant TopologyTestDriver) pour les enregistrements avro.

  • Enjeux : Impossible de "simuler" schemaRegistry pour automatiser la publication/lecture des schémas.

Ce que j'ai essayé jusqu'à présent, c'est d'utiliser MockSchemaRegistryClient pour essayer de simuler le schemaRegistry, mais je ne sais pas comment le relier à l'Avro Serde.

Code

public class SyncronizerIntegrationTest {

    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();

    @Test
    void integrationTest() throws IOException, RestClientException {

        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());

        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }
}

Méthode AvroSerde

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}

Si j'exécute le test sans testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking())); il fonctionne bien (il semble que tout soit bien installé)

Mais

Lorsque j'essaie d'insérer des données ( pipeInput ), il lance l'exception suivante : L'objet "Tracking" est plein.

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)

Modifié, je n'ai pas supprimé cela, pour que le "journal historique" fournisse le chemin suivi.

3voto

cricket_007 Points 6938

Confluent fournit une pléthore d'exemples de code pour tester Kafka (Streams) avec le Schema Registry.

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

Plus important encore, le mocking n'est pas un test d'intégration complet - démarrer un courtier Kafka réel avec un registre de schémas en mémoire l'est.

Dans le code ci-dessus, voir

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

Y

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());

3voto

Matthias J. Sax Points 25989

Clause de non-responsabilité : je n'ai pas testé ce produit. Il s'agit simplement d'idées que je partage sur la manière dont vous pourriez faire fonctionner ce système. J'espère que cela vous aidera. Si vous pouvez apporter des commentaires à cette réponse, nous serions ravis d'obtenir une solution correcte et opérationnelle.

Je ne pense pas que vous puissiez utiliser le Avro Serde normal via config :

props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());

D'après ce que j'ai compris, il essaiera de se connecter à

props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

Cependant, l'utilisation de MockSchemaRegistryClient il n'y a pas de point d'accès http auquel se connecter. Au lieu de cela, vous devez passer le client fictif au Serde lorsque vous le créez :

MockSchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
// add the schemas you want to use
schemaRegistryClient.register(...);
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);

Ainsi, il suffit de configurer un point d'accès http "factice", car le client fictif ne l'utilisera pas de toute façon.

La transmission du Serde correspondant par le biais d'un code comme celui-ci semble correcte :

StreamBuilder.stream("topic", Consumed.with(Serdes.String(), avroSerde));

3voto

Levani Kokhreidze Points 105

L'approche qui a le mieux fonctionné pour nous est la suivante conteneurs de test java avec les images docker de la plateforme Confluent. Vous pouvez configurer le fichier docker compose suivant :

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:5.0.0
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    environment:
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
    ports:
      - 8081:8081
    depends_on:
      - zookeeper
      - kafka

La seule chose que vous devez faire est d'ajouter 127.0.0.1 kafka a /etc/hosts . Avec cette approche, vous disposez essentiellement d'un cluster entier opérationnel pour votre test d'intégration. La grappe sera détruite une fois le test d'intégration terminé.

EDITAR:

Améliorer docker-compose sans modifier /etc/hosts

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    hostname: zookeeper
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:5.0.0
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
    extra_hosts:
      - "moby:127.0.0.1"

Kafka sera disponible sur localhost:9092

1voto

Vassilis Points 131

Pour ce faire, j'ai fini par créer une petite bibliothèque de tests basée sur testcontainers : https://github.com/vspiliop/embedded-kafka-cluster . Démarre un cluster Kafka basé sur docker entièrement configurable (broker, zookeeper et Confluent Schema Registry) dans le cadre de vos tests. Consultez les exemples de tests unitaires et de tests cucumber.

La principale différence avec d'autres solutions non basées sur docker (par exemple spring-boot embedded kafka test) est que le fichier docker compose est "généré" via les paramètres de l'annotation @EmbeddedKafkaCluster et qu'il n'est pas codé en dur. Cela signifie que vous pouvez configurer vos tests pour correspondre à 100% à la production et être sûr que toutes les versions correspondent à votre cluster de production en définissant le paramètre confluent platformVersion .

En outre, vous pouvez utiliser des outils comme toxi-proxy pour écrire des tests unitaires qui testent le comportement de votre cluster réel lorsque certaines erreurs de réseau se produisent.

Par exemple, vous pouvez utiliser l'annotation @EmbeddedKafkaCluster comme suit :

@ContextConfiguration()
@EmbeddedKafkaCluster(topics = {"test.t"}, brokersCount = 1, zookeepersCount = 1, schemaRegistriesCount = 1, platformVersion = "your_production_kafka_confluent_version")
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
public class FeatureSteps {

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X