181 votes

Comment les étapes sont divisées en tâches de bougie ?

Supposons pour la suite que seule une Étincelle d'emploi est en cours d'exécution à chaque point dans le temps.

Ce que j'ai jusqu'à présent

Voici ce que je comprends ce qui se passe dans Spark:

  1. Lorsqu'un SparkContext est créé, chaque travailleur nœud démarre un exécuteur testamentaire. Les exécuteurs sont des processus distincts (JVM), qui se connecte au programme pilote. Chaque interprète a le pot de la pilote du programme. Fermeture d'un pilote, s'arrête, les exécuteurs testamentaires. Chaque interprète peut contenir des partitions.
  2. Lorsqu'une tâche est exécutée, un plan d'exécution est créé selon la lignée graphique.
  3. L'exécution du travail est divisé en étapes, où les étapes contenant autant de voisins (dans la lignée graphique) et les transformations de l'action, mais pas de mélange. Ainsi, les étapes sont séparées par des remaniements.

image 1

Je comprends que

  • Une tâche est une commande envoyée par le conducteur d'un exécuteur testamentaire par la sérialisation de l'objet de Fonction.
  • L'exécuteur désérialise (avec le jar du pilote), la commande (de la tâche) et s'exécute sur une partition.

mais

Question(s)

Comment puis-je diviser la scène en ces tâches?

Plus précisément:

  1. Sont les tâches que déterminée par les transformations et les actions ou peut-être plusieurs transformations/actions dans une tâche?
  2. Sont les tâches déterminées par la partition (par exemple, une tâche par par étape par partition).
  3. Sont les tâches déterminées par les nœuds (par exemple, une tâche par étape par nœud)?

Ce que je pense (seulement partielle de la réponse, même si à droite)

Dans https://0x0fff.com/spark-architecture-shufflele shuffle est expliqué avec l'image

enter image description here

et j'ai l'impression que la règle est

chaque étape est divisée en #nombre de partitions de tâches, sans aucun égard pour le nombre de nœuds

Pour ma première image, je dirais que j'aurais 3 carte tâches et 3 réduire les tâches.

Pour l'image de 0x0fff, je dirais il y a 8 carte tâches et 3 réduire les tâches (en supposant qu'il y a seulement trois d'orange et trois vert foncé fichiers).

Des questions ouvertes dans tous les cas

Est-ce exact? Mais même si c'est correct, mes questions ci-dessus ne sont pas tous répondu, parce que c'est toujours ouverte, si de multiples opérations (par exemple, plusieurs cartes) sont dans une tâche ou sont séparés dans l'une des tâches par opération.

Ce que disent les autres

Ce qui est une tâche Spark? Comment l'Étincelle travailleur d'exécuter le fichier jar? et Comment ne l'Apache Spark planificateur de diviser des fichiers en plusieurs tâches? sont similaires, mais je n'ai pas l'impression que ma question a été répondue clairement là.

67voto

javadba Points 2430

Vous avez un très bon aperçu ici. Pour répondre à vos questions

  • Séparé task n'a besoin d'être lancé pour chaque partition de données pour chaque stage. Considérer que chaque partition peut trouver sur distincts emplacements physiques - par exemple, des blocs dans HDFS ou des répertoires/volumes pour un système de fichiers local.

Noter que la présentation de l' Stages est entraîné par l' DAG Scheduler. Cela signifie que les étapes qui ne sont pas interdépendantes peut être soumis à la grappe pour l'exécution en parallèle: cela permet de maximiser la parallélisation de la capacité sur le cluster. Donc, si les activités de nos flux de données peuvent se produire simultanément nous nous attendons à voir de multiples étapes lancé.

Nous pouvons voir que l'action dans les jouets exemple dans lequel nous faisons les types d'opérations suivants:

  • charger des deux sources de données
  • effectuer une certaine opération de carte sur les deux sources de données séparément
  • se joindre à eux
  • effectuer certaines carte et filtre les opérations sur le résultat
  • enregistrer le résultat

Alors combien d'étapes nous permettront de nous retrouver avec?

  • 1 chaque étape de chargement, les deux sources de données en parallèle = 2 étapes
  • Une troisième étape représente l' join qui est dépendante sur les deux autres étapes
  • Remarque: toutes les opérations de suivi de travail sur les données jointes peuvent être effectuées dans la même scène parce qu'ils doivent se produire de façon séquentielle. Il n'y a aucun avantage à lancer de nouvelles étapes, car ils ne peuvent pas commencer à travailler avant la avant l'opération ont été achevés.

Ici, c'est que le jouet du programme

val sfi  = sc.textFile("/data/blah/input").map{ x => val xi = x.toInt; (xi,xi*xi) }
val sp = sc.parallelize{ (0 until 1000).map{ x => (x,x * x+1) }}
val spj = sfi.join(sp)
val sm = spj.mapPartitions{ iter => iter.map{ case (k,(v1,v2)) => (k, v1+v2) }}
val sf = sm.filter{ case (k,v) => v % 10 == 0 }
sf.saveAsTextFile("/data/blah/out")

Et voici le DAG de la suite

enter image description here

Maintenant: combien de tâches ? Le nombre de tâches doit être égale à

Somme de (Stage * #Partitions in the stage)

36voto

pedram bashiri Points 354

Cela pourrait vous aider à mieux comprendre les différentes pièces:

  • Stade: est un ensemble de tâches. Même processus en cours d'exécution contre différents sous-ensembles de données (partitions).
  • Tâche: représente une unité de travail sur une partition d'un distribué dataset. Donc, à chaque étape, nombre de tâches = nombre de partitions, ou comme vous l'avez dit "une tâche par étape par partition".
  • Chaque exécuteur s'exécute sur l'un des fils de conteneur, et chaque conteneur se trouve sur un nœud.
  • Chaque étape utilise plusieurs des exécuteurs, chaque exécuteur est alloué plusieurs vcores.
  • Chaque vcore peut exécuter une tâche à la fois
  • Donc, à tout moment, plusieurs tâches peuvent être exécutées en parallèle. nombre de tâches en cours d'exécution = nombre-de-vcores utilisé.

17voto

Harel Gliksman Points 419

Si je comprends bien il y a 2 ( relative ) des choses que vous confondre:

1) qu'est Ce qui détermine le contenu d'une tâche?

2) qu'est Ce qui détermine le nombre de tâches à exécuter?

Allumage du moteur "colles" ensemble de simples opérations consécutives rdd, par exemple:

rdd1 = sc.textFile( ... )
rdd2 = rdd1.filter( ... )
rdd3 = rdd2.map( ... )
rdd3RowCount = rdd3.count

alors, quand rdd3 est (paresseusement) calculé, étincelle va générer une tâche par partition de rdd1 et chaque tâche d'exécuter le filtre et la carte par ligne de résultat dans rdd3.

Le nombre de tâches est déterminée par le nombre de partitions. Tous CA a défini un certain nombre de partitions. Pour une source CA qui est lu à partir HDFS ( à l'aide de sc.fichier texte( ... ) par exemple), le nombre de partitions est le nombre de temps intermédiaires générés par le format d'entrée. Quelques opérations sur des RDD(s) peut entraîner un EDR avec un nombre différent de partitions:

rdd2 = rdd1.repartition( 1000 ) will result in rdd2 having 1000 partitions ( regardless of how many partitions rdd1 had ).

Un autre exemple est joint:

rdd3 = rdd1.join( rdd2  , numPartitions = 1000 ) will result in rdd3 having 1000 partitions ( regardless of partitions number of rdd1 and rdd2 ).

( La plupart ) des opérations qui modifient le nombre de partitions impliquent un shuffle, Quand nous le faisons par exemple:

rdd2 = rdd1.repartition( 1000 ) 

ce qui se passe réellement est la tâche sur chaque partition de rdd1 doit produire une sortie qui peut être lu par l'étape suivante afin de faire rdd2 ont exactement 1000 partitions ( Comment ils le font? De hachage ou de Tri ). Les tâches de ce côté sont parfois appelées "Carte ( côté ) tâches". Une tâche qui sera, plus tard, sur rdd2 va agir sur une partition ( de rdd2! ) et aurait pour comprendre comment lire/combiner la carte côté sorties pertinentes à cette partition. Les tâches de ce côté sont parfois appelées "Réduire ( de côté ) des tâches".

Les 2 questions sont liées: le nombre de tâches dans un stade est le nombre de partitions ( commun à l'consécutives rdds "collés" ensemble ) et le nombre de partitions d'un edr peuvent changer entre les étapes ( en spécifiant le nombre de partitions à certains shuffle provoquant opération par exemple ).

Une fois l'exécution d'une étape de début de ses tâches peuvent occuper des tâches machines à sous. Le nombre de connexions simultanées à la tâche de slots est numExecutors * ExecutorCores. En général, ceux-ci peuvent être occupés par des tâches de différentes, non-dépendants stades.

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X