Je travaille avec Spark 2.1.1 sur un jeu de données avec ~2000 caractéristiques et j'essaie de créer un pipeline ML de base, composé de quelques transformateurs et d'un classificateur.
Supposons, pour simplifier, que le pipeline avec lequel je travaille se compose d'un assembleur de vecteurs, d'un indexeur de chaînes et d'un classificateur, ce qui serait un cas d'utilisation assez courant.
// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("featuresRaw")
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol("TARGET")
.setOutputCol("indexedLabel")
// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featuresRaw")
.setMaxBins(30)
// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(5))
.addGrid(rf.maxDepth, Array(5))
.build()
// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setLabelCol("indexedLabel")
Si les étapes du pipeline sont séparées en un pipeline de transformation (VectorAssembler + StringIndexer) et un second pipeline de classification, et si les colonnes inutiles sont supprimées entre les deux pipelines, la formation réussit. Cela signifie que pour réutiliser les modèles, deux PipelineModels doivent être sauvegardés après la formation et une étape intermédiaire de prétraitement doit être introduite.
// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)
val mainPipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator()
.setEstimator(mainPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]
La solution (imho) beaucoup plus propre serait de fusionner toutes les étapes du pipeline en un seul pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, assmbleFeatures, rf))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
// This will fail!
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]
Cependant, le fait de mettre toutes les PipelineStages dans une seule Pipeline conduit à l'exception suivante, probablement due à ce problème ce Les relations publiques finiront par résoudre le problème :
ERROR CodeGenerator : échec de la compilation : org.codehaus.janino.JaninoRuntimeException : Le pool de constantes de la classe org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection a dépassé la limite JVM de 0xFFFF.
La raison en est que le VectorAssembler double effectivement (dans cet exemple) la quantité de données dans le DataFrame, puisqu'il n'y a pas de transformateur qui pourrait supprimer les colonnes inutiles. (Voir assembleur de vecteurs du pipeline spark drop other columns )
L'exemple fonctionne sur le jeu de données golub et les étapes de prétraitement suivantes sont nécessaires :
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)
// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)
// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))
// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")
Comme je suis nouveau dans Spark, je ne suis pas sûr de la meilleure façon de résoudre ce problème. Suggérez-vous...
pour créer un nouveau transformateur, qui laisse tomber les colonnes et qui peut être incorporé dans le pipeline ?- diviser les deux pipelines et introduire l'étape intermédiaire
- autre chose ? :)
Ou est-ce que je rate quelque chose d'important (étapes du pipeline, PR, etc.) qui permettrait de résoudre ce problème ?
Edit :
J'ai implémenté un nouveau Transformateur DroppingVectorAssembler
qui supprime les colonnes inutiles, mais la même exception est levée.
En outre, la mise en place spark.sql.codegen.wholeStage
à false
ne résout pas le problème.