2 votes

Apache Flink nombre dynamique de Sinks

J'utilise Apache Flink et le KafkaConsumer pour lire certaines valeurs d'un sujet Kafka. J'ai également un flux obtenu à partir de la lecture d'un fichier.

En fonction des valeurs reçues, je voudrais écrire ce flux sur différents sujets Kafka.

En fait, j'ai un réseau avec un leader lié à de nombreux enfants. Pour chaque enfant, le leader doit écrire le flux lu dans un sujet Kafka spécifique à l'enfant, afin que l'enfant puisse le lire. Lorsque l'enfant est lancé, il s'enregistre dans le sujet Kafka lu par le Leader. Le problème est que je ne sais pas apriori combien d'enfants j'ai.

Par exemple, j'ai lu 1 depuis le Kafka Topic, je veux écrire le flux dans un seul Kafka Topic nommé Topic1. Je lis 1-2 et je veux écrire sur deux Kafka Topic. (Topic1 et Topic2)

Je ne sais pas si c'est possible car pour écrire sur le sujet, j'utilise le producteur Kafka avec la méthode AddSink et, d'après ce que j'ai compris (et d'après mes tentatives), il semble que Flink ait besoin de connaître le nombre de puits apriori.

Mais alors, il n'y a aucun moyen d'obtenir un tel comportement ?

1voto

Helder Pereira Points 1254

Si j'ai bien compris votre problème, je pense que vous pouvez le résoudre avec un seul sink, puisque vous pouvez choisir le topic Kafka en fonction de l'enregistrement traité. Il semble également qu'un élément de la source puisse être écrit dans plus d'un sujet, auquel cas vous auriez besoin d'un filtre de type FlatMapFunction pour répliquer chaque enregistrement source N fois (un pour chaque sujet de sortie), que je recommande de sortir en tant que paire (alias Tupple2 ) avec (sujet, enregistrement).

DataStream<Tupple2<String, MyValue>> stream = input.flatMap(new FlatMapFunction<>() {
    public void flatMap(MyValue value, Collector<Tupple2<String, MyValue>> out) {
        for (String topic : topics) {
            out.collect(Tupple2.of(topic, value));
        }
    }
});

Ensuite, vous pouvez utiliser la rubrique précédemment calculée en créant la rubrique FlinkKafkaProducer avec un KeyedSerializationSchema dans laquelle vous mettez en œuvre getTargetTopic pour retourner le premier élément de la paire.

stream.addSink(new FlinkKafkaProducer10<>(
        "default-topic",
        new KeyedSerializationSchema<>() {
            public String getTargetTopic(Tupple2<String, MyValue> element) {
                return element.f0;
            }
            ...
        },
        kafkaProperties)
);

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X