J'utilise ce morceau de code trouvé quelque part.
public class OffsetManager {
private String storagePrefix;
public OffsetManager(String storagePrefix) {
this.storagePrefix = storagePrefix;
}
/**
* Overwrite the offset for the topic in an external storage.
*
* @param topic - Topic name.
* @param partition - Partition of the topic.
* @param offset - offset to be stored.
*/
void saveOffsetInExternalStore(String topic, int partition, long offset) {
try {
FileWriter writer = new FileWriter(storageName(topic, partition), false);
BufferedWriter bufferedWriter = new BufferedWriter(writer);
bufferedWriter.write(offset + "");
bufferedWriter.flush();
bufferedWriter.close();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* @return he last offset + 1 for the provided topic and partition.
*/
long readOffsetFromExternalStore(String topic, int partition) {
try {
Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
private String storageName(String topic, int partition) {
return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
}
}
SaveOffset... est appelé une fois que le traitement de l'enregistrement est réussi, sinon aucun offset n'est stocké. et j'utilise les sujets Kafka comme source, donc je spécifie les startingoffsets comme les offsets récupérés de ReadOffsets...