J'utilise Spark pour traiter un volume de données de plus de 20 To. J'essaie d'écrire les données dans une table Hive, en utilisant ce qui suit :
df.registerTempTable('temporary_table')
sqlContext.sql("INSERT OVERWRITE TABLE my_table SELECT * FROM temporary_table")
où df
est le DataFrame Spark. Malheureusement, il n'a pas de dates sur lesquelles je peux partitionner. Lorsque j'ai exécuté le code ci-dessus, j'ai rencontré le message d'erreur :
py4j.protocol.Py4JJavaError : Une erreur s'est produite lors de l'appel à z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe. : org.apache.spark.SparkException : Le travail a été interrompu en raison d'un échec de l'étape : La taille totale des résultats sérialisés de 95561 tâches (1024.0 Mo) est supérieure à spark.driver.maxResultSize (1024.0 Mo)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(python.scala:126) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124) at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087) at org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:124) at org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)
Le message d'erreur semble également dépendre de la quantité de données. Avec des données légèrement plus petites, j'ai rencontré le message d'erreur suivant
Les statuts de sortie de la carte étaient de 395624469 octets, ce qui dépasse spark.akka.frameSize (134217728 octets).
Quel est le moyen le plus pratique d'y parvenir (si la tâche est réalisable) ? J'utilise Spark 1.6.
Voici les variables de configuration lors de la soumission du job spark : spark-submit --deploy-mode cluster --master yarn --executor-memory 20G --num-executors 500 --driver-memory 64g --driver-cores 8 --files 'my_script.py'
BTW, naïvement, j'imaginerais que lorsque les opérations d'écriture se produisent, Spark écrira les données des exécuteurs vers hdfs. Mais le message d'erreur semble impliquer qu'il y a des transferts de données entre les exécuteurs et le pilote ?
Je n'ai qu'une connaissance superficielle de Spark, alors pardonnez-moi pour mes questions stupides !