42 votes

Pourquoi la jointure échoue-t-elle avec "java.util.concurrent.TimeoutException: les contrats à terme ont expiré après [300 secondes]"?

Je suis à l'aide de l'Étincelle 1.5.

J'ai deux dataframes de la forme:

scala> libriFirstTable50Plus3DF
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int]

scala> linkPersonItemLessThan500DF
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int]

libriFirstTable50Plus3DF a 766,151 dossiers tout en linkPersonItemLessThan500DF a 26,694,353 enregistrements. Notez que je suis en utilisant repartition(number) sur linkPersonItemLessThan500DF depuis que j'ai l'intention de joindre ces deux plus tard. Je suis le code ci-dessus avec:

val userTripletRankDF = linkPersonItemLessThan500DF
     .join(libriFirstTable50Plus3DF, Seq("family_id"))
     .take(20)
     .foreach(println(_))

pour laquelle j'obtiens ce résultat:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200)
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:        at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
 at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190)
 at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
 at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904)
 at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385)
 at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315)
 at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378)
 at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363)
 at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85)
 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87)
 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91)
 at $iwC$$iwC$$iwC.<init>(<console>:93)
 at $iwC$$iwC.<init>(<console>:95)
 at $iwC.<init>(<console>:97)
 at <init>(<console>:99)
 at .<init>(<console>:103)
 at .<clinit>(<console>)
 at .<init>(<console>:7)
 at .<clinit>(<console>)
 at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
 at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
 at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
 at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
 at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
 at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
 at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

et je ne comprends pas quel est le problème. Est-ce aussi simple que l'augmentation du temps d'attente? Est la jointure trop intensive? J'ai besoin de plus de mémoire? Est le shufffling intensive? Quelqu'un peut-il aider?

67voto

T. Gawęda Points 9535

Cela se produit parce que l'Étincelle qui essaye de faire de la Diffusion de Jointure de Hachage et l'un des DataFrames est très grand, donc l'envoi d'elle consomme beaucoup de temps.

Vous pouvez:

  1. La hausse spark.sql.broadcastTimeout d'augmentation du délai d'attente - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() deux DataFrames, puis Étincelle va utiliser Shuffle à la Jointure de référence à partir d' ici

PySpark

Dans PySpark, vous pouvez définir la configuration lorsque vous générez l'étincelle contexte de la manière suivante:

spark = SparkSession
  .builder
  .appName("Your App")
  .config("spark.sql.broadcastTimeout", "36000")
  .getOrCreate()

24voto

Jacek Laskowski Points 6668

Juste pour ajouter un peu de code le contexte de la très concise réponse de @T. Gawęda.


Dans votre application Spark Spark SQL fait choisir une diffusion de jointure de hachage pour la rejoindre parce que "libriFirstTable50Plus3DF a 766,151 dossiers" ce qui s'est passé à moins que le soi-disant diffusion seuil (par défaut à 10 MO).

Vous pouvez contrôler la diffusion de seuil à l'aide de l'étincelle.sql.autoBroadcastJoinThreshold propriété de configuration.

spark.sql.autoBroadcastJoinThreshold Configure la taille maximale, en octets, pour un tableau, qui sera diffusé à tous les nœuds du travailleur lors de l'exécution d'une jointure. En définissant cette valeur à -1 de radiodiffusion peut être désactivé. Il faut noter que les statistiques sont uniquement pris en charge pour la Ruche Metastore tables où la commande ANALYSER la TABLE de CALCULER des STATISTIQUES noscan a été exécuté.

Vous pouvez trouver ce type particulier de rejoindre la trace de la pile:

org.apache.spark.sql.l'exécution.les jointures.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110)

BroadcastHashJoin physique de l'opérateur Spark SQL utilise une diffusion variable pour distribuer le plus petit ensemble de données à Étincelle exécuteurs testamentaires (plutôt que de l'expédition d'un exemplaire de chaque tâche).

Si vous avez utilisé explain à l'examen de la physique plan de requête, vous remarqueriez la requête utilise BroadcastExchangeExec physique de l'opérateur. C'est là que vous pouvez voir les sous-jacents des machines pour la diffusion la plus petite table (et le délai d'attente).

override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
  ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]]
}

doExecuteBroadcast fait partie d' SparkPlan contrat que chaque opérateur physique Spark SQL suivante qui permet de radiodiffusion, si nécessaire. BroadcastExchangeExec arrive d'en avoir besoin.

Le délai d'attente paramètre est ce que vous cherchez.

private val timeout: Duration = {
  val timeoutValue = sqlContext.conf.broadcastTimeout
  if (timeoutValue < 0) {
    Duration.Inf
  } else {
    timeoutValue.seconds
  }
}

Comme vous pouvez le voir, vous pouvez le désactiver complètement (à l'aide d'une valeur négative), ce qui impliquerait d'attendre la diffusion de la variable à être livrés aux exécuteurs indéfiniment ou utiliser sqlContext.conf.broadcastTimeout - ce qui est exactement étincelle.sql.broadcastTimeout propriété de configuration. La valeur par défaut est 5 * 60 secondes vous pouvez le voir dans la stacktrace:

java.util.de façon concomitante.TimeoutException: contrats à Terme expiré après [300 secondes]

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X