2 votes

Spark - comment écrire ~20TB de données d'un DataFrame vers une table hive ou hdfs ?

J'utilise Spark pour traiter un volume de données de plus de 20 To. J'essaie d'écrire les données dans une table Hive, en utilisant ce qui suit :

df.registerTempTable('temporary_table')
sqlContext.sql("INSERT OVERWRITE TABLE my_table SELECT * FROM temporary_table")

df est le DataFrame Spark. Malheureusement, il n'a pas de dates sur lesquelles je peux partitionner. Lorsque j'ai exécuté le code ci-dessus, j'ai rencontré le message d'erreur :

py4j.protocol.Py4JJavaError : Une erreur s'est produite lors de l'appel à z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. : org.apache.spark.SparkException : Le travail a été interrompu en raison d'un échec de l'étape : La taille totale des résultats sérialisés de 95561 tâches (1024.0 Mo) est supérieure à spark.driver.maxResultSize (1024.0 Mo)

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(python.scala:126)
at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124)
at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
at org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:124)
at org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

Le message d'erreur semble également dépendre de la quantité de données. Avec des données légèrement plus petites, j'ai rencontré le message d'erreur suivant

Les statuts de sortie de la carte étaient de 395624469 octets, ce qui dépasse spark.akka.frameSize (134217728 octets).

Quel est le moyen le plus pratique d'y parvenir (si la tâche est réalisable) ? J'utilise Spark 1.6.

Voici les variables de configuration lors de la soumission du job spark : spark-submit --deploy-mode cluster --master yarn --executor-memory 20G --num-executors 500 --driver-memory 64g --driver-cores 8 --files 'my_script.py'

BTW, naïvement, j'imaginerais que lorsque les opérations d'écriture se produisent, Spark écrira les données des exécuteurs vers hdfs. Mais le message d'erreur semble impliquer qu'il y a des transferts de données entre les exécuteurs et le pilote ?

Je n'ai qu'une connaissance superficielle de Spark, alors pardonnez-moi pour mes questions stupides !

0voto

vaquar khan Points 2622

Vérifiez la configuration suivante et modifiez-la selon vos besoins ,les valeurs par défaut sont 1 g

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X