75 votes

Comment faire de bons exemples reproductibles d'Apache Spark

J'ai passé une bonne quantité de temps à lire à travers quelques questions avec le et balises et très souvent je trouve que les affiches ne fournissent pas suffisamment d'informations pour bien comprendre leur question. J'ai l'habitude de commentaire demandant d'afficher un MCVE mais parfois les amener à montrer certains de l'échantillon d'entrée/sortie de données, c'est comme arracher des dents. Par exemple: voir les commentaires sur cette question.

Peut-être une partie du problème est que les gens ne savent pas comment créer facilement un MCVE pour spark-dataframes. Je pense qu'il serait utile d'avoir une étincelle dataframe version de ce pandas question comme un guide qui peut être lié.

Alors, comment fait-on pour la création d'un bon, reproductible exemple?

71voto

pault Points 12252

Fournir des données échantillon, qui peut être facilement recréé.

À tout le moins, les affiches doivent permettre à un couple de lignes et de colonnes sur leurs dataframe et le code qui peut être utilisé pour créer facilement il. Par facile, je veux dire couper et coller. Rendre aussi petite que possible afin de démontrer votre problème.


J'ai le texte suivant dataframe:

+-----+---+-----+----------+
|index|  X|label|      date|
+-----+---+-----+----------+
|    1|  1|    A|2017-01-01|
|    2|  3|    B|2017-01-02|
|    3|  5|    A|2017-01-03|
|    4|  7|    B|2017-01-04|
+-----+---+-----+----------+

qui peut être créé avec ce code:

df = sqlCtx.createDataFrame(
    [
        (1, 1, 'A', '2017-01-01'),
        (2, 3, 'B', '2017-01-02'),
        (3, 5, 'A', '2017-01-03'),
        (4, 7, 'B', '2017-01-04')
    ],
    ('index', 'X', 'label', 'date')
)

Montrer la sortie souhaitée.

Posez votre question et nous montrer votre sortie désirée.


Comment puis-je créer une nouvelle colonne 'is_divisible' qui a de la valeur 'yes' si le jour du mois de l' 'date' plus de 7 jours est divisible par la valeur dans la colonne'X', et 'no' autrement?

Sortie désirée:

+-----+---+-----+----------+------------+
|index|  X|label|      date|is_divisible|
+-----+---+-----+----------+------------+
|    1|  1|    A|2017-01-01|         yes|
|    2|  3|    B|2017-01-02|         yes|
|    3|  5|    A|2017-01-03|         yes|
|    4|  7|    B|2017-01-04|          no|
+-----+---+-----+----------+------------+

Expliquer comment obtenir votre sortie.

Expliquer, en détail, comment vous obtenez votre sortie désirée. Il permet de montrer un exemple de calcul.


Par exemple, dans la ligne 1, X = 1 et la date = 2017-01-01. L'ajout de 7 jours à date de rendements 2017-01-08. Le jour du mois est de 8 et, depuis le 8 est divisible par 1, la réponse est "oui".

De même, pour la dernière ligne X = 7 et la date = 2017-01-04. L'ajout de 7 à la date des rendements de 11, le jour du mois. Depuis 11 % 7 n'est pas 0, la réponse est "non".


Partagez vos code existant.

Montrez-nous ce que vous avez fait ou essayé, y compris tous les* du code, même si elle ne fonctionne pas. Dites-nous où vous êtes coincé et si vous recevez un message d'erreur, veuillez inclure le message d'erreur.

(*Vous pouvez laisser le code pour créer l'étincelle contexte, mais vous devez inclure toutes les importations.)


Je sais comment faire pour ajouter une nouvelle colonne, c'est - date plus de 7 jours, mais je vais avoir du mal à obtenir le jour du mois sous forme de nombre entier.

from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))

Inclure les versions, les importations, et d'utiliser la coloration syntaxique


Pour le réglage des performances des postes, inclure le plan d'exécution


L'analyse de l'étincelle fichiers de sortie

  • MaxU utiles code dans cette réponse pour aider à analyser l'Étincelle fichiers de sortie dans un DataFrame.

D'autres notes.

26voto

hi-zir Points 19277

Réglage des performances

Si la question est liée à l'optimisation des performances veuillez inclure les informations suivantes.

Plan D'Exécution

Il est préférable d'inclure de l' étendue du plan d'exécution. En Python:

df.explain(True) 

En Scala:

df.explain(true)

ou étendue du plan d'exécution avec les statistiques. En Python:

print(df._jdf.queryExecution().stringWithStats())

en Scala:

df.queryExecution.stringWithStats

Mode et informations sur le cluster

  • mode - local, client, `cluster.
  • Le gestionnaire de Cluster (le cas échéant) néant (mode local), autonome, de FIL, de Mesos, Kubernetes.
  • La configuration de base de l'information (nombre de cœurs, de l'exécuteur testamentaire de la mémoire).

Des informations de synchronisation

lent est relatif, surtout quand vous le port de la non-application distribuée ou vous vous attendez à faible latence. Horaires exactes pour les différentes tâches et les étapes, peut être récupéré à partir de l'Étincelle de l'INTERFACE utilisateur (sc.uiWebUrl) jobs d'Étincelle ou de REPOS de l'INTERFACE utilisateur.

Utilisation standarized noms pour les contextes

À l'aide de noms pour chaque contexte nous permet rapidement de reproduire le problème.

  • sc - pour SparkContext.
  • sqlContext - pour SQLContext.
  • spark - pour SparkSession.

Fournir des informations de type (Scala)

Puissant l'inférence de type est l'une des caractéristiques les plus utiles de la Scala, mais il rend difficile pour analyser le code sortis de leur contexte. Même si le type est évident, d'après le contexte, il est préférable d'annoter les variables. Préférez

val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))

plus

val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))

Outils couramment utilisés peuvent vous aider:

  • spark-shell / Scala shell

    utiliser :t

    scala> val rdd = sc.textFile("README.md")
    rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
    
    scala> :t rdd
    org.apache.spark.rdd.RDD[String]
    
  • InteliJ Idée

    Utilisez Alt + =

17voto

desertnaut Points 15220

Bonne question et réponse; quelques autres suggestions:

Inclure votre Spark version

L'étincelle est toujours en évolution, mais pas si rapidement que dans les jours de 1.x. Il est toujours (mais surtout si vous utilisez un peu ancienne version) une bonne idée d'inclure votre version de travail. Personnellement, je commence toujours mes réponses avec:

spark.version
# u'2.2.0'

ou

sc.version
# u'2.2.0'

Y compris votre version de Python, trop, n'est jamais une mauvaise idée.


Inclure toutes vos importations

Si votre question n'est pas strictement sur Spark SQL & dataframes, par exemple, si vous avez l'intention d'utiliser votre dataframe dans certains d'apprentissage de la machine de l'opération, être explicite sur vos importations - voir cette question, où les importations ont été ajoutés dans l'OP qu'après de nombreux échanges dans l' (maintenant supprimé) commentaires (et s'est avéré que ces mauvais importations ont été la cause racine du problème).

Pourquoi est-ce nécessaire? Parce que, par exemple, cette LDA

from pyspark.mllib.clustering import LDA

est différent de ce LDA:

from pyspark.ml.clustering import LDA

la première venue de l'ancien, CA API (anciennement Étincelle MLlib), tandis que la seconde à partir de la nouvelle dataframe API (Étincelle ML).


Inclure la mise en valeur du code

OK, j'avoue c'est subjectif: je crois que PySpark questions ne devraient pas être étiqueté comme python par défaut; la chose est, python balise donne automatiquement la mise en valeur du code (et je crois que c'est une des raisons principales pour ceux qui l'utilisent pour PySpark questions). De toute façon, si vous l'acceptez, et vous tiens encore une belle, a souligné le code, il suffit de comprendre les démarque de la directive:

<!-- language-all: lang-python -->

quelque part dans ton post, avant votre premier extrait de code.

[Mise à JOUR: j'ai demandé automatique de la syntaxe pour l' pyspark et sparkr tags - upvotes la plupart de bienvenue]

14voto

MaxU Points 5284

Cette petite fonction d'assistance peut aider à analyser les fichiers de sortie de Spark dans DataFrame:

PySpark:

 from pyspark.sql.functions import *

def read_spark_output(file_path):
    step1 = spark.read \
             .option("header","true") \
             .option("inferSchema","true") \
             .option("delimiter","|") \
             .option("parserLib","UNIVOCITY") \
             .option("ignoreLeadingWhiteSpace","true") \
             .option("ignoreTrailingWhiteSpace","true") \
             .option("comment","+") \
             .csv("file://{}".format(file_path))
    # select not-null columns
    step2 = t.select([c for c in t.columns if not c.startswith("_")])
    # deal with 'null' string in column
    return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
 

Scala:

 // read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
  val step1 = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("delimiter", "|")
    .option("parserLib", "UNIVOCITY")
    .option("ignoreLeadingWhiteSpace", "true")
    .option("ignoreTrailingWhiteSpace", "true")
    .option("comment", "+")
    .csv(filePath)

  val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)

  val columns = step2.columns
  columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
 

Usage:

 df = read_spark_output("file:///tmp/spark.out")
 

PS: Pour pyspark , eqNullSafe est disponible à partir de spark 2.3 .

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X