2 votes

Kafka Connect JDBC OOM - Grande quantité de données

Je suis en train de tenter de mettre en place quelque chose de similaire à ce tutoriel. Cependant, cela a fonctionné car l'ensemble de données est très petit. Comment pourrais-je faire cela pour une table plus grande? Parce que je reçois constamment une erreur de mémoire insuffisante. Mes logs sont

ka.connect.runtime.rest.RestServer:60)
[2018-04-04 17:16:17,937] INFO [Worker clientId=connect-1, groupId=connect-cluster] Marque le coordinateur ip-172-31-14-140.ec2.internal:9092 (id: 2147483647 rack: null) mort (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[2018-04-04 17:16:17,938] ERREUR Exception non attrapée dans le thread de travail de l'herdier, sortie :  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:218)
java.lang.OutOfMemoryError: Espace de heap Java insuffisant
[2018-04-04 17:16:17,939] ERREUR Exception non attrapée dans le thread 'kafka-coordinator-heartbeat-thread | connect-sink-redshift' : (org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread:51)
java.lang.OutOfMemoryError: Espace de heap Java insuffisant
[2018-04-04 17:16:17,940] INFO Arrêt de Kafka Connect (org.apache.kafka.connect.runtime.Connect:65)
[2018-04-04 17:16:17,940] INFO Arrêt du serveur REST (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2018-04-04 17:16:17,940] ERREUR WorkerSinkTask{id=sink-redshift-0} La tâche a provoqué une exception non attrapée et irrécupérable (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Espace de heap Java insuffisant
[2018-04-04 17:16:17,940] ERREUR WorkerSinkTask{id=sink-redshift-0} La tâche est en cours d'arrêt et ne pourra pas récupérer tant qu'elle n'est pas redémarrée manuellement (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,940] INFO Arrêt de la tâche (io.confluent.connect.jdbc.sink.JdbcSinkTask:96)
[2018-04-04 17:16:17,941] INFO WorkerSourceTask{id=production-db-0} Validation des décalages (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2018-04-04 17:16:17,940] ERREUR Exception inattendue dans le Thread[KafkaBasedLog Work Thread - connect-statuses,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.OutOfMemoryError: Espace de heap Java insuffisant
[2018-04-04 17:16:17,946] INFO WorkerSourceTask{id=production-db-0} écoule 0 messages en attente pour la validation du décalage (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
[2018-04-04 17:16:17,954] ERREUR WorkerSourceTask{id=production-db-0} La tâche a provoqué une exception non attrapée et irrécupérable (org.apache.kafka.connect.runtime.WorkerTask:172)
java.lang.OutOfMemoryError: Espace de heap Java insuffisant
[2018-04-04 17:16:17,960] ERREUR WorkerSourceTask{id=production-db-0} La tâche est en cours d'arrêt et ne pourra pas récupérer tant qu'elle n'est pas redémarrée manuellement (org.apache.kafka.connect.runtime.WorkerTask:173)
[2018-04-04 17:16:17,960] INFO [Producteur clientId=producteur-4] Fermeture du producteur Kafka avec timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341)
[2018-04-04 17:16:17,960] INFO Le connecteur du serveur s'est arrêté @64f4bfe4{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2018-04-04 17:16:17,967] INFO Le contexte de servlet o.e.j.s.ServletContextHandler@2f06a90b{/,null,INDISPONIBLE} s'est arrêté (org.eclipse.jetty.server.handler.ContextHandler:865)

J'ai également essayé d'augmenter la mémoire avec la suggestion ici mais je suis incapable de charger l'ensemble de la table en mémoire. Y a-t-il un moyen de limiter le nombre de données produites ?

2voto

cricket_007 Points 6938

Pour le connecteur JDBC, la propriété la plus importante que vous pouvez probablement appliquer serait celle-ci, ce qui semble être ce que vous demandez.

batch.max.rows

Nombre maximal de lignes à inclure dans un seul lot lors de la recherche de nouvelles données. Ce paramètre peut être utilisé pour limiter la quantité de données mise en mémoire tampon à l'intérieur du connecteur.

Il n'est pas nécessaire de "mettre l'intégralité de la table en mémoire". Avec des lots plus petits, et des recherches et des validations plus fréquentes, vous pouvez vous assurer que presque toutes les lignes seront analysées, et vous ne serez pas exposé au risque d'un échec d'un gros lot, puis le connecteur s'arrête pendant un certain temps, puis redémarre et en manque quelques lignes lors de la prochaine recherche.

Sinon, assurez-vous de ne pas utiliser le mode de table en bloc, car il essaiera de scanner l'intégralité de la table encore et encore.

Aussi, l'option query peut faire une projection de colonnes sur la table.

Vous pouvez trouver plus d'options de configuration dans la documentation, mais toutes les erreurs OOM devront être soigneusement examinées cas par cas en activant la surveillance JMX et en exportant ces valeurs dans un système agrégé que vous pouvez surveiller de plus près comme Prometheus au lieu de simplement voir l'erreur OOM et ne pas savoir si le changement de paramètre particulier aide vraiment.


Une autre option serait d'utiliser des connecteurs basés sur CDC comme le montre un autre billet de blog

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X