18 votes

Pourquoi l'application Spark échoue-t-elle avec "ClassNotFoundException: Failed to find data source: kafka" en tant qu'uber-jar avec sbt assembly?

Je suis en train d'essayer d'exécuter un exemple comme https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala. J'ai commencé avec le guide de programmation Spark Structured Streaming à http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

Mon code est

package io.boontadata.spark.job1

import org.apache.spark.sql.SparkSession

object DirectKafkaAggregateEvents {
  val FIELD_MESSAGE_ID = 0
  val FIELD_DEVICE_ID = 1
  val FIELD_TIMESTAMP = 2
  val FIELD_CATEGORY = 3
  val FIELD_MEASURE1 = 4
  val FIELD_MEASURE2 = 5

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println(s"""
        |Usage: DirectKafkaAggregateEvents   
        |   est une liste d'un ou plusieurs courtiers Kafka
        |   valeur d'exemple: subscribe
        |   est une liste d'un ou plusieurs sujets kafka à consommer
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(bootstrapServers, subscribeType, topics) = args

    val spark = SparkSession
      .builder
      .appName("boontadata-spark-job1")
      .getOrCreate()

    import spark.implicits._

    // Créer un DataSet représentant le flux de lignes d'entrée depuis kafka
    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as[String]

    // Générer le décompte de mots en cours
    val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

    // Démarrer l'exécution de la requête qui imprime les comptes en cours dans la console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }

}

J'ai ajouté les fichiers sbt suivants:

build.sbt:

name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"

// Exclusion des META-INF
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

J'ai également ajouté project/assembly.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Cela crée un jar Uber avec les jars non fournis.

Je soumets avec la ligne suivante:

spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic

mais je reçois cette erreur d'exécution:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
        at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
        at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
        ... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook

Y a-t-il un moyen de savoir quelle classe n'est pas trouvée afin que je puisse rechercher cette classe sur le dépôt maven.org.

Le code source de lookupDataSource semble être à la ligne 543 à https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala mais je n'ai pas pu trouver de lien direct avec la source de données Kafka...

Le code source complet est ici: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f

13voto

Sree Eedupuganti Points 231

J'ai essayé comme ça, ça fonctionne pour moi. Soumettez de cette manière et faites-moi savoir une fois que vous rencontrez des problèmes

./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar

12voto

Jacek Laskowski Points 6668

Le problème est la section suivante dans build.sbt:

// META-INF discard
assemblyMergeStrategy in assembly := { 
   {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
   }
}

Cela signifie que toutes les entrées META-INF doivent être supprimées, y compris le "code" qui permet aux alias des sources de données (par exemple, kafka) de fonctionner.

Pourtant, les fichiers META-INF sont très importants pour que kafka (et d'autres alias de sources de données de streaming) fonctionne.

Pour que l'alias kafka fonctionne, Spark SQL utilise META-INF/services/org.apache.spark.sql.sources.DataSourceRegister avec l'entrée suivante:

org.apache.spark.sql.kafka010.KafkaSourceProvider

KafkaSourceProvider est responsable de l'enregistrement de l'alias kafka avec la bonne source de données en streaming, c'est-à-dire KafkaSource.

Juste pour vérifier que le véritable code est effectivement disponible, mais que le "code" qui enregistre l'alias ne l'est pas, vous pourriez utiliser la source de données kafka en utilisant le nom entièrement qualifié (et non l'alias) comme suit:

spark.readStream.
  format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
  load

Vous verrez d'autres problèmes dus à des options manquantes comme kafka.bootstrap.servers, mais... nous nous écartons du sujet.

Une solution est de MergeStrategy.concat tous les META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (ce qui créerait un uber-jar avec toutes les sources de données, y compris la source de données kafka).

case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat

3voto

ssice Points 854

Dans mon cas, j'ai également obtenu cette erreur lors de la compilation avec sbt, et la cause était que sbt assembly n'incluait pas l'artefact spark-sql-kafka-0-10_2.11 dans le fat jar.

(Je serais très reconnaissant pour des commentaires ici. La dépendance n'a pas été spécifiée à une portée, donc il ne faut pas supposer qu'elle est "fournie").

J'ai donc changé pour déployer un jar normal (mince) et inclure les dépendances avec les paramètres --jars pour spark-submit.

Pour rassembler toutes les dépendances au même endroit, vous pouvez ajouter retrieveManaged := true à vos paramètres de projet sbt, ou vous pouvez, dans la console sbt, exécuter :

>set retrieveManaged := true
>package

Cela devrait apporter toutes les dépendances dans le dossier lib_managed.

Ensuite, vous pouvez copier tous ces fichiers (avec une commande bash, vous pouvez par exemple utiliser quelque chose comme ceci

cd /chemin/vers/votre/projet

LISTEJARS=$(find lib_managed -name '*.jar'| paste -sd , -)

spark-submit [autres-args] target/votre-app-1.0-SNAPSHOT.jar --jars "$LISTEJARS"

1voto

dalin qin Points 96

Je suis en train d'utiliser spark 2.1 et je rencontre le même problème ma solution temporaire est

1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

2) cd ~/.ivy2/jars ici vous avez ,tous les fichiers jars nécessaires sont dans ce dossier maintenant

3) copiez tous les jars dans ce dossier sur tous les nœuds (vous pouvez créer un dossier spécifique pour les contenir)

4) ajoutez le nom du dossier à spark.driver.extraClassPath et spark.driver.extraClassPath ,par exemple spark.driver.extraClassPath=/opt/jars/*:vos_autres_jars

5 spark-submit --class ClasseNom --Autres-Options VotreJar.jar fonctionne maintenant sans problème

0voto

Raghav Points 696

J'ai résolu le problème en téléchargeant le fichier jar sur le système de pilotes. Ensuite, j'ai fourni le jar à spark submit avec l'option --jar.

Il convient également de noter que j'emballais tout l'environnement spark 2.1 dans mon uber jar (puisque mon cluster est toujours sur la version 1.6.1). Pour une raison quelconque, il n'est pas détecté lorsqu'il est inclus dans l'uber jar.

spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X