2 votes

Plugin de source de données personnalisé sur EMR lançant java.lang.NoClassDefFoundError : scalaj/http/Http

J'utilise une source de données personnalisée située ici https://github.com/sourav-mazumder/Data-Science-Extensions/releases

Lorsque je travaille avec cela localement en utilisant un environnement Spark Dockerisé, cela fonctionne comme prévu. Cependant, lorsque je l'utilise sur EMR, j'obtiens l'erreur mentionnée dans le titre de cette question. Voici les options de configuration EMR utilisées, les messages de démarrage Spark et le code de test et le résultat lorsque je l'exécute. Je ne suis pas sûr de ce que je dois configurer d'autre.

aws emr console configuration

La configuration est la suivante :

[
  {
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3"
        }
      }
    ],
    "classification": "spark-env",
    "properties": {}
  },
  {
    "configurations": [
      {
        "classification": "export",
        "properties": {
          "PYSPARK_PYTHON": "/usr/bin/python3"
        }
      }
    ],
    "classification": "yarn-env",
    "properties": {}
  },
  {
    "classification": "spark-defaults",
    "properties": {
      "spark.executor.extraClassPath": "/home/hadoop/*",
      "spark.driver.extraClassPath": "/home/hadoop/*",
      "spark.jars.packages": "org.scalaj:scalaj-http_2.10:2.3.0"
    }
  }
]

Il y a également une étape de démarrage pour copier le JAR de la source de données disponible sur le lien ci-dessus, que j'ai téléchargé sur S3, sur chaque nœud du cluster spark à l'adresse suivante /home/hadoop :

aws s3 cp s3://foo/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar /home/hadoop/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar

Lorsque je me connecte en SSH au nœud maître et que je démarre une session PySpark (en tant que ec2-user ) Je vois les messages suivants :

sudo pyspark 
Python 3.6.7 (default, Dec 21 2018, 20:31:01) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.scalaj#scalaj-http_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e;1.0
    confs: [default]
    found org.scalaj#scalaj-http_2.10;2.3.0 in central
:: resolution report :: resolve 140ms :: artifacts dl 5ms
    :: modules in use:
    org.scalaj#scalaj-http_2.10;2.3.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e
    confs: [default]
    0 artifacts copied, 1 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/04/11 19:00:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/04/11 19:00:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/04/11 19:00:40 WARN Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Python version 3.6.7 (default, Dec 21 2018 20:31:01)
SparkSession available as 'spark'.

J'exécute ensuite mon code de test :

sodauri = 'https://soda.demo.socrata.com/resource/6yvf-kk3n.json'
sodainput1 = [("Nevada"), ("nn")]
sodainput2 = [("Northern California"), ("pr")]
sodainput3 = [("Virgin Islands region"), ("pr")]
sodainputRdd = spark.sparkContext.parallelize([sodainput1, sodainput2, sodainput3])
sodaDf = sodainputRdd.toDF(["region","source"])
sodaDf.createOrReplaceTempView('sodainputtbl')                         
prmsSoda = { 'url' : sodauri, 'input' : 'sodainputtbl', 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
sodasDf = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**prmsSoda).load()
[Stage 6:>                                                          (0 + 1) / 1]19/04/11 20:34:39 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
    at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scalaj.http.Http$
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 31 more

19/04/11 20:34:39 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o193.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 11, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
    at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
    at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
    at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:438)
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:419)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.<init>(RestRelation.scala:101)
    at org.apache.dsext.spark.datasource.rest.RestDataSource.createRelation(RestDataSource.scala:42)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scalaj/http/Http$
    at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
    at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
>>> 

Qu'est-ce qui ne va pas dans ma configuration ?

Mise à jour

J'ai essayé de configurer le cluster EMR avec scalaj-http_2.11 mais j'obtiens toujours la même erreur. J'ai également reconstruit le plugin en m'assurant qu'il était construit pour la version 2.11.12 de Scala et Spark 2.4, les mêmes que ceux qui tournent sur EMR. J'obtiens toujours la même erreur.

Je suis en train d'examiner les messages d'avertissement au cas où le problème viendrait de là :

Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.

Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.

1voto

LaserJesus Points 1288

Je l'ai fait fonctionner

Il semble que la configuration des paquets ne fonctionne pas. Au lieu de cela, j'ai copié les fichiers scalaj-http_2.11:2.3.0 du repo de téléchargements maven dans le même répertoire que le plugin custom datasource a également été copié. Cela fonctionne maintenant (avec le plugin spark.jars.packages supprimée de la configuration du DME)

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X