41 votes

Méthode optimale pour créer un pipeline ml dans Apache Spark pour un ensemble de données avec un nombre élevé de colonnes.

Je travaille avec Spark 2.1.1 sur un jeu de données avec ~2000 caractéristiques et j'essaie de créer un pipeline ML de base, composé de quelques transformateurs et d'un classificateur.

Supposons, pour simplifier, que le pipeline avec lequel je travaille se compose d'un assembleur de vecteurs, d'un indexeur de chaînes et d'un classificateur, ce qui serait un cas d'utilisation assez courant.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

Si les étapes du pipeline sont séparées en un pipeline de transformation (VectorAssembler + StringIndexer) et un second pipeline de classification, et si les colonnes inutiles sont supprimées entre les deux pipelines, la formation réussit. Cela signifie que pour réutiliser les modèles, deux PipelineModels doivent être sauvegardés après la formation et une étape intermédiaire de prétraitement doit être introduite.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

La solution (imho) beaucoup plus propre serait de fusionner toutes les étapes du pipeline en un seul pipeline.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

Cependant, le fait de mettre toutes les PipelineStages dans une seule Pipeline conduit à l'exception suivante, probablement due à ce problème ce Les relations publiques finiront par résoudre le problème :

ERROR CodeGenerator : échec de la compilation : org.codehaus.janino.JaninoRuntimeException : Le pool de constantes de la classe org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection a dépassé la limite JVM de 0xFFFF.

La raison en est que le VectorAssembler double effectivement (dans cet exemple) la quantité de données dans le DataFrame, puisqu'il n'y a pas de transformateur qui pourrait supprimer les colonnes inutiles. (Voir assembleur de vecteurs du pipeline spark drop other columns )

L'exemple fonctionne sur le jeu de données golub et les étapes de prétraitement suivantes sont nécessaires :

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Comme je suis nouveau dans Spark, je ne suis pas sûr de la meilleure façon de résoudre ce problème. Suggérez-vous...

  1. pour créer un nouveau transformateur, qui laisse tomber les colonnes et qui peut être incorporé dans le pipeline ?
  2. diviser les deux pipelines et introduire l'étape intermédiaire
  3. autre chose ? :)

Ou est-ce que je rate quelque chose d'important (étapes du pipeline, PR, etc.) qui permettrait de résoudre ce problème ?


Edit :

J'ai implémenté un nouveau Transformateur DroppingVectorAssembler qui supprime les colonnes inutiles, mais la même exception est levée.

En outre, la mise en place spark.sql.codegen.wholeStage à false ne résout pas le problème.

2voto

Ramandeep Nanda Points 509

El janino que vous obtenez est due au fait qu'en fonction de l'ensemble des fonctionnalités, le code généré devient plus grand.

Je séparerais les étapes en différents pipelines et laisserais tomber les fonctionnalités inutiles, en sauvant les modèles intermédiaires tels que StringIndexer y OneHotEncoder et les charger pendant l'étape de prédiction, ce qui est également utile car les transformations seraient plus rapides pour les données qui doivent être prédites.

Enfin, il n'est pas nécessaire de conserver les colonnes de caractéristiques après avoir exécuté la fonction VectorAssembler stade où il transforme les caractéristiques en un feature vector y label et c'est tout ce dont vous avez besoin pour exécuter des prédictions.

Exemple de pipeline en Scala avec sauvegarde des étapes intermédiaires - (ancienne API spark)

De plus, si vous utilisez une ancienne version de spark comme la 1.6.0, vous devez vérifier la version corrigée, c'est-à-dire la 2.1.1 ou la 2.2.0 ou la 1.6.4, sinon vous rencontrerez le problème suivant Janino erreur, même avec environ 400 colonnes de caractéristiques.

1voto

JamCon Points 2046

El janino L'erreur est due au nombre de variables constantes créées au cours du processus d'optimisation. La limite maximale de variables constantes autorisée dans la JVM est ((2^16) -1). Si cette limite est dépassée, vous obtenez l'erreur suivante Constant pool for class ... has grown past JVM limit of 0xFFFF

Le JIRA qui résoudra ce problème est SPARK-18016 mais c'est toujours en cours à l'heure actuelle.

Il est fort probable que votre code échoue lors de la VectorAssembler lorsqu'il doit traiter des milliers de colonnes au cours d'une seule tâche d'optimisation.

La solution que j'ai mise au point pour résoudre ce problème consiste à créer un "vecteur de vecteurs" en travaillant sur des sous-ensembles de colonnes, puis en rassemblant les résultats à la fin pour créer un vecteur caractéristique singulier. Cela permet d'éviter qu'une seule tâche d'optimisation ne dépasse la limite des constantes de la JVM. Ce n'est pas élégant, mais je l'ai utilisé sur des ensembles de données atteignant les 10 000 colonnes.

Cette méthode vous permet également de conserver un seul pipeline, bien qu'elle nécessite quelques étapes supplémentaires pour la faire fonctionner (création des sous-vecteurs). Une fois que vous avez créé le vecteur de caractéristiques à partir des sous-vecteurs, vous pouvez supprimer les colonnes sources d'origine si vous le souhaitez.

Exemple de code :

// IMPORT DEPENDENCIES
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}

// Create first example dataframe
val exampleDF = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
  (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
  (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
)).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
        "col6", "col7", "col8", "col9", "colA", "colB", 
        "colC", "colD", "colE", "colF", "colG", "colH", 
        "colI", "colJ", "colK")

// Create multiple column lists using the sliding method
val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray

// Create a vector assembler for each column list
val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")

// Create a vector assembler using column list vectors as input
val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")

// Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))

// Fit and transform the data
val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)

// Get the number of features in "features" vector
val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))

// Print number of features in "features vector"
print(featureLength)

(Note : La méthode de création des listes de colonnes devrait vraiment être faite par programme, mais j'ai gardé cet exemple simple pour la compréhension du concept).

Prograide.com

Prograide est une communauté de développeurs qui cherche à élargir la connaissance de la programmation au-delà de l'anglais.
Pour cela nous avons les plus grands doutes résolus en français et vous pouvez aussi poser vos propres questions ou résoudre celles des autres.

Powered by:

X