J'utilise une source de données personnalisée située ici https://github.com/sourav-mazumder/Data-Science-Extensions/releases
Lorsque je travaille avec cela localement en utilisant un environnement Spark Dockerisé, cela fonctionne comme prévu. Cependant, lorsque je l'utilise sur EMR, j'obtiens l'erreur mentionnée dans le titre de cette question. Voici les options de configuration EMR utilisées, les messages de démarrage Spark et le code de test et le résultat lorsque je l'exécute. Je ne suis pas sûr de ce que je dois configurer d'autre.
La configuration est la suivante :
[
{
"configurations": [
{
"classification": "export",
"properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
],
"classification": "spark-env",
"properties": {}
},
{
"configurations": [
{
"classification": "export",
"properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
],
"classification": "yarn-env",
"properties": {}
},
{
"classification": "spark-defaults",
"properties": {
"spark.executor.extraClassPath": "/home/hadoop/*",
"spark.driver.extraClassPath": "/home/hadoop/*",
"spark.jars.packages": "org.scalaj:scalaj-http_2.10:2.3.0"
}
}
]
Il y a également une étape de démarrage pour copier le JAR de la source de données disponible sur le lien ci-dessus, que j'ai téléchargé sur S3, sur chaque nœud du cluster spark à l'adresse suivante /home/hadoop
:
aws s3 cp s3://foo/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar /home/hadoop/spark-datasource-rest_2.11-2.1.0-SNAPSHOT.jar
Lorsque je me connecte en SSH au nœud maître et que je démarre une session PySpark (en tant que ec2-user
) Je vois les messages suivants :
sudo pyspark
Python 3.6.7 (default, Dec 21 2018, 20:31:01)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /home/ec2-user/.ivy2/cache
The jars for the packages stored in: /home/ec2-user/.ivy2/jars
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.scalaj#scalaj-http_2.10 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e;1.0
confs: [default]
found org.scalaj#scalaj-http_2.10;2.3.0 in central
:: resolution report :: resolve 140ms :: artifacts dl 5ms
:: modules in use:
org.scalaj#scalaj-http_2.10;2.3.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-200abadc-d4a6-47dd-a2e9-110f77de3b4e
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/6ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/04/11 19:00:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/04/11 19:00:36 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
19/04/11 19:00:40 WARN Client: Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 3.6.7 (default, Dec 21 2018 20:31:01)
SparkSession available as 'spark'.
J'exécute ensuite mon code de test :
sodauri = 'https://soda.demo.socrata.com/resource/6yvf-kk3n.json'
sodainput1 = [("Nevada"), ("nn")]
sodainput2 = [("Northern California"), ("pr")]
sodainput3 = [("Virgin Islands region"), ("pr")]
sodainputRdd = spark.sparkContext.parallelize([sodainput1, sodainput2, sodainput3])
sodaDf = sodainputRdd.toDF(["region","source"])
sodaDf.createOrReplaceTempView('sodainputtbl')
prmsSoda = { 'url' : sodauri, 'input' : 'sodainputtbl', 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
sodasDf = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**prmsSoda).load()
[Stage 6:> (0 + 1) / 1]19/04/11 20:34:39 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 8, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: scalaj.http.Http$
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 31 more
19/04/11 20:34:39 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 172, in load
return self._df(self._jreader.load())
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o193.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 11, ip-10-100-14-225.us-west-2.compute.internal, executor 3): java.lang.NoClassDefFoundError: scalaj/http/Http$
at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$.infer(JsonInferSchema.scala:83)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$$anonfun$inferFromDataset$1.apply(JsonDataSource.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource$.inferFromDataset(JsonDataSource.scala:108)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at org.apache.spark.sql.DataFrameReader$$anonfun$2.apply(DataFrameReader.scala:439)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:438)
at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:419)
at org.apache.dsext.spark.datasource.rest.RESTRelation.<init>(RestRelation.scala:101)
at org.apache.dsext.spark.datasource.rest.RestDataSource.createRelation(RestDataSource.scala:42)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: scalaj/http/Http$
at org.apache.dsext.spark.datasource.rest.RestConnectorUtil$.callRestAPI(RestConnectorUtil.scala:53)
at org.apache.dsext.spark.datasource.rest.RESTRelation.org$apache$dsext$spark$datasource$rest$RESTRelation$$callRest(RestRelation.scala:128)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at org.apache.dsext.spark.datasource.rest.RESTRelation$$anonfun$2.apply(RestRelation.scala:100)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:70)
at org.apache.spark.sql.catalyst.json.JsonInferSchema$$anonfun$1.apply(JsonInferSchema.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
>>>
Qu'est-ce qui ne va pas dans ma configuration ?
Mise à jour
J'ai essayé de configurer le cluster EMR avec scalaj-http_2.11
mais j'obtiens toujours la même erreur. J'ai également reconstruit le plugin en m'assurant qu'il était construit pour la version 2.11.12 de Scala et Spark 2.4, les mêmes que ceux qui tournent sur EMR. J'obtiens toujours la même erreur.
Je suis en train d'examiner les messages d'avertissement au cas où le problème viendrait de là :
Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Same path resource file:///home/ec2-user/.ivy2/jars/org.scalaj_scalaj-http_2.10-2.3.0.jar added multiple times to distributed cache.