81 votes

Comment écrire des tests unitaires dans Spark 2.0+ ?

J'ai essayé de trouver un moyen raisonnable pour tester SparkSession avec le cadre de test JUnit. Bien qu'il semble y avoir de bons exemples de SparkContext je n'ai pas réussi à trouver comment faire fonctionner un exemple correspondant pour SparkSession même s'il est utilisé à plusieurs reprises en interne dans le document spark-testing-base . Je serais heureux d'essayer une solution qui n'utilise pas spark-testing-base aussi bien si ce n'est pas vraiment la bonne façon de procéder ici.

Cas de test simple ( compléter le projet MWE avec build.sbt ) :

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession

class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

Le résultat de cette exécution avec JUnit est un NPE à la ligne de chargement :

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Notez que le fait que le fichier chargé existe ou non ne devrait pas avoir d'importance ; dans un SparkSession correctement configuré, un fichier une erreur plus judicieuse sera déclenchée .

1 votes

Merci à tous pour les réponses reçues jusqu'à présent ; j'espère pouvoir faire le point bientôt. J'ai également ouvert une question et je la renvoie ici : github.com/holdenk/spark-testing-base/issues/180

0 votes

Malheureusement, je n'ai pas encore eu l'occasion d'utiliser Spark ... un jour, peut-être 3.x à ce rythme - sinon je travaillerais à accepter une réponse. Je suis heureux que cela ait été utile à d'autres.

119voto

Vidya Points 10226

Merci d'avoir posé cette question exceptionnelle. Pour une raison quelconque, lorsqu'il s'agit de Spark, tout le monde est tellement pris par l'analyse qu'il oublie les grandes pratiques de génie logiciel qui ont émergé au cours des 15 dernières années environ. C'est pourquoi nous mettons un point d'honneur à discuter des tests et de l'intégration continue (entre autres choses comme DevOps) dans notre cours.

Un bref rappel de la terminologie

A vrai Un test unitaire signifie que vous avez un contrôle total sur chaque composant du test. Il ne peut y avoir aucune interaction avec les bases de données, les appels REST, les systèmes de fichiers, ou même l'horloge du système ; tout doit être "doublé" (c'est-à-dire simulé, bloqué, etc.) comme le dit Gerard Mezaros dans Patrons de test xUnit . Je sais que cela semble être de la sémantique, mais c'est vraiment important. Ne pas comprendre cela est l'une des principales raisons pour lesquelles vous voyez des échecs de test intermittents dans l'intégration continue.

On peut toujours faire des tests unitaires

Donc, étant donné cette compréhension, les tests unitaires et RDD est impossible. Cependant, les tests unitaires ont toujours leur place dans le développement de l'analytique.

Considérons une opération simple :

rdd.map(foo).map(bar)

Ici foo et bar sont des fonctions simples. Elles peuvent être testées à l'unité de la manière habituelle, et elles devraient l'être avec autant de cas de figure que possible. Après tout, pourquoi se préoccuper de l'endroit d'où proviennent les entrées, qu'il s'agisse d'un dispositif de test ou d'un système d'exploitation. RDD ?

N'oubliez pas l'étincelle

Ce n'est pas un test en soi Mais à ces premiers stades, vous devriez également faire des expériences dans le shell Spark pour déterminer vos transformations et surtout les conséquences de votre approche. Par exemple, vous pouvez examiner les plans de requête physiques et logiques, la stratégie de partitionnement et la préservation, ainsi que l'état de vos données à l'aide de nombreuses fonctions différentes telles que toDebugString , explain , glom , show , printSchema et ainsi de suite. Je vous laisse les explorer.

Vous pouvez également régler votre maître sur local[2] dans le shell Spark et dans vos tests afin d'identifier tout problème qui pourrait ne survenir que lorsque vous commencerez à distribuer le travail.

Tests d'intégration avec Spark

Maintenant, passons aux choses amusantes.

Afin de test d'intégration Spark après que vous ayez confiance dans la qualité de vos fonctions d'aide et RDD / DataFrame En ce qui concerne la logique de transformation, il est essentiel de faire quelques choses (indépendamment de l'outil de construction et du cadre de test) :

  • Augmenter la mémoire de la JVM.
  • Active la bifurcation mais désactive l'exécution parallèle.
  • Utilisez votre cadre de test pour accumuler vos tests d'intégration Spark dans des suites, et initialisez le fichier SparkContext avant tous les tests et l'arrêter après tous les tests.

Avec ScalaTest, vous pouvez mélanger dans BeforeAndAfterAll (que je préfère généralement) ou BeforeAndAfterEach comme le fait @ShankarKoirala pour initialiser et démonter les artefacts Spark. Je sais qu'il s'agit d'un endroit raisonnable pour faire une exception, mais je n'aime vraiment pas ces objets mutables. var que vous devez utiliser.

Le modèle de prêt

Une autre approche consiste à utiliser le Types de prêts .

Par exemple (en utilisant ScalaTest) :

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

Comme vous pouvez le constater, le modèle de prêt fait appel à des fonctions d'ordre supérieur pour "prêter" l'objet de l'opération. SparkContext à l'épreuve, puis de s'en débarrasser une fois l'épreuve terminée.

La programmation orientée vers la souffrance (Merci, Nathan)

C'est une question de préférence, mais je préfère utiliser le modèle de prêt et câbler les choses moi-même aussi longtemps que possible avant de faire appel à un autre cadre. En plus d'essayer de rester léger, les frameworks ajoutent parfois beaucoup de "magie" qui rend le débogage des échecs de test difficile à raisonner. Je prends donc une Programmation axée sur la souffrance où j'évite d'ajouter un nouveau cadre jusqu'à ce que la douleur de ne pas l'avoir soit trop dure à supporter. Mais encore une fois, c'est à vous de voir.

Le meilleur choix pour ce cadre alternatif est bien sûr spark-testing-base comme l'a mentionné @ShankarKoirala. Dans ce cas, le test ci-dessus ressemblerait à ceci :

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

Notez que je n'ai pas eu à faire quoi que ce soit pour gérer le SparkContext . SharedSparkContext m'a donné tout ça avec sc comme le SparkContext -- gratuitement. Personnellement, je n'utiliserais pas cette dépendance dans ce seul but, car le Loan Pattern fait exactement ce dont j'ai besoin pour cela. De plus, compte tenu de l'imprévisibilité des systèmes distribués, il peut être très pénible de devoir retracer la magie qui se produit dans le code source d'une bibliothèque tierce lorsque les choses tournent mal lors de l'intégration continue.

Maintenant où spark-testing-base Les aides basées sur Hadoop telles que HDFSClusterLike et YARNClusterLike . Le fait de combiner ces caractéristiques peut vraiment vous épargner bien des soucis de configuration. Un autre endroit où il brille est avec le Scalacheck -comme les propriétés et les générateurs - en supposant bien sûr que vous compreniez comment les tests basés sur les propriétés fonctionnent et pourquoi ils sont utiles. Mais encore une fois, j'hésiterais à l'utiliser jusqu'à ce que mes analyses et mes tests atteignent ce niveau de sophistication.

"Seul un Sith traite avec des absolus." -- Obi-Wan Kenobi

Bien sûr, vous n'êtes pas obligé de choisir l'un ou l'autre. Vous pourriez peut-être utiliser l'approche Loan Pattern pour la plupart de vos tests et spark-testing-base seulement pour quelques tests plus rigoureux. Le choix n'est pas binaire ; vous pouvez faire les deux.

Tests d'intégration avec Spark Streaming

Enfin, je voudrais simplement présenter un extrait de ce à quoi pourrait ressembler une configuration de test d'intégration de SparkStreaming avec des valeurs en mémoire sans spark-testing-base :

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

C'est plus simple qu'il n'y paraît. Il s'agit simplement de transformer une séquence de données en une file d'attente à destination de l'application DStream . La plupart d'entre eux ne sont en fait qu'une configuration passe-partout qui fonctionne avec les API de Spark. Quoi qu'il en soit, vous pouvez le comparer avec StreamingSuiteBase comme on le trouve dans spark-testing-base pour décider de ce que vous préférez.

C'est peut-être mon post le plus long, alors je vais le laisser ici. J'espère que d'autres personnes apporteront d'autres idées pour améliorer la qualité de nos analyses grâce aux mêmes pratiques de génie logiciel agile qui ont amélioré le développement de toutes les autres applications.

Et avec mes excuses pour cette publicité éhontée, vous pouvez consulter notre cours. Analytique avec Apache Spark où nous abordons beaucoup de ces idées et plus encore. Nous espérons avoir bientôt une version en ligne.

0 votes

Merci pour la réponse détaillée, mais l'utilisation du modèle loan vous fera démarrer et arrêter le contexte spark pour chacun des cas de test définis, et la seule façon d'éviter cela est d'utiliser la réponse fournie par koiralo ?

27voto

Shankar Koirala Points 11688

Vous pouvez écrire un test simple avec FunSuite et BeforeAndAfterEach comme ci-dessous

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

Vous n'avez pas besoin de créer une fonction dans le test, vous pouvez simplement l'écrire en tant que

test ("test name") {//implementation and assert}

Holden Karau a rédigé un très bon test spark-testing-base

Vous devez vérifier ci-dessous est un exemple simple

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

J'espère que cela vous aidera !

0 votes

Excellente réponse. Le site spark-spec utilisait une approche similaire, mais elle était trop lente lorsqu'un grand nombre de fichiers de test étaient ajoutés au projet. Voir ma réponse pour une implémentation alternative qui ne force pas le SparkSession à être arrêté / démarré après chaque fichier de test.

2 votes

J'aime aussi la première partie de cette réponse ; j'aurais juste aimé que le deuxième exemple contienne des éléments de Spark au lieu d'une assertion jouet. Au-delà de cela, je voudrais souligner que la notion d'effectuer des effets secondaires coûteux avant et/ou après une suite de tests n'est pas une idée nouvelle. Comme je l'ai suggéré dans ma réponse, ScalaTest dispose d'amples mécanismes pour cela - dans ce cas pour gérer les artefacts Spark - et vous pouvez les utiliser comme vous le feriez pour tout autre dispositif coûteux. Au moins jusqu'à ce que le moment vienne où l'introduction d'un cadre tiers plus lourd en vaille la peine.

0 votes

Par ailleurs, ScalaTest et specs2 (qui, je crois, le fait par défaut) peuvent tous deux exécuter des tests en parallèle pour gagner en vitesse. Les outils de construction peuvent aussi aider. Mais encore une fois, rien de tout cela n'est nouveau.

19voto

Eugene Lopatkin Points 457

Depuis Spark 1.6 vous pourriez utiliser SharedSparkContext ou SharedSQLContext que Spark utilise pour ses propres tests unitaires :

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

Depuis Spark 2.3 SharedSparkSession est disponible :

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

UPDATE :

Dépendance Maven :

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

Dépendance vis-à-vis du SBT :

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

En outre, vous pouvez vérifier sources de test de Spark où il y a un énorme ensemble de combinaisons de test variées.

UPDATE 2 :

Test unitaire Apache Spark Partie 1 - Composants de base

Apache Spark Unit Testing Part 2 - Spark SQL

Test unitaire Apache Spark Partie 3 - Streaming

Test d'intégration d'Apache Spark

1 votes

Savez-vous quel paquet maven contient cette classe ?

0 votes

Bien sûr. Les deux en "org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

0 votes

Pour Maven <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql</artifactId> <version>SPARK_VERSION</version> <type>test-jar</type> <scope>test</scope> </dependency>

15voto

Powers Points 1742

J'aime créer un SparkSessionTestWrapper qui peut être intégré dans les classes de test. L'approche de Shankar fonctionne, mais elle est excessivement lente pour les suites de tests comportant plusieurs fichiers.

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

Le trait peut être utilisé comme suit :

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

Vérifiez le spark-spec pour un exemple concret qui utilise le projet SparkSessionTestWrapper approche.

Mise à jour

Le site bibliothèque spark-testing-base ajoute automatiquement le SparkSession quand certains traits sont mélangés à la classe de test (par exemple quand DataFrameSuiteBase est mélangée, vous aurez accès au SparkSession via la fonction spark variable).

J'ai créé une bibliothèque de test séparée appelée spark-fast-tests pour donner aux utilisateurs le contrôle total du SparkSession lors de l'exécution de leurs tests. Je ne pense pas qu'une bibliothèque d'aide aux tests doive définir le SparkSession. Les utilisateurs devraient pouvoir démarrer et arrêter leur SparkSession comme ils le souhaitent (j'aime créer un seul SparkSession et l'utiliser tout au long de l'exécution de la suite de tests).

Voici un exemple de spark-fast-tests assertSmallDatasetEquality en action :

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}

1 votes

Dans cette approche, comment recommandez-vous d'ajouter sparkSession.stop() quelque part ?

0 votes

Vous ne devriez pas avoir besoin de sparkSession.stop() @NeilBest. La session Spark sera fermée lorsque l'exécution de la suite de tests sera terminée.

1 votes

Pourquoi ne pas utiliser sparkSession.stop() ? comme la réponse de @Shankar Koirala arrête le sparkSession, est-ce inutile ?

1voto

sunitha Points 563

J'ai pu résoudre le problème avec le code suivant

la dépendance spark-hive est ajoutée dans le pom du projet

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X