5 votes

Streaming structuré Spark et gestion des décalages Kafka

Je cherche à stocker les offsets kafka à l'intérieur de kafka pour Spark Structured Streaming, comme cela fonctionne pour DStreams. stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) Je cherche la même chose, mais pour le streaming structuré. Le système supporte-t-il le streaming structuré ? Si oui, comment puis-je le réaliser ?

Je connais le point de contrôle HDF en utilisant .option("checkpointLocation", checkpointLocation) mais ce qui m'intéresse, c'est la gestion intégrée des compensations.

Je m'attends à ce que Kafka stocke les offsets uniquement à l'intérieur sans point de contrôle de spark hdfs.

0voto

Jkulkarni Points 1

J'utilise ce morceau de code trouvé quelque part.

public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

SaveOffset... est appelé une fois que le traitement de l'enregistrement est réussi, sinon aucun offset n'est stocké. et j'utilise les sujets Kafka comme source, donc je spécifie les startingoffsets comme les offsets récupérés de ReadOffsets...

0voto

mike Points 9735

"Est-ce qu'il supporte le streaming structuré ?"

Non, Structured Streaming ne permet pas de renvoyer les offsets vers Kafka, comme cela pourrait être fait avec Spark Streaming (DStreams). Le Guide d'intégration de Spark Structured Streaming + Kafka sur Configurations spécifiques à Kafka est très précis à ce sujet :

"La source Kafka ne commet aucun décalage."

J'ai rédigé une réponse plus complète à ce sujet dans le document suivant Comment définir manuellement les offsets groupId et commit Kafka dans Spark Structured Streaming ? .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X