在Apache Spark中为具有大量列的数据集创build一个mlpipe道的最佳方法

我正在使用Spark 2.1.1来处理具有〜2000特性的数据集,并试图创build一个基本的MLpipe道,由一些变形金刚和一个分类器组成。

我们假设为了简单起见,我正在使用的pipe道包含一个VectorAssembler,StringIndexer和一个Classifier,这将是一个相当常见的用例。

// Pipeline elements val assmbleFeatures: VectorAssembler = new VectorAssembler() .setInputCols(featureColumns) .setOutputCol("featuresRaw") val labelIndexer: StringIndexer = new StringIndexer() .setInputCol("TARGET") .setOutputCol("indexedLabel") // Train a RandomForest model. val rf: RandomForestClassifier = new RandomForestClassifier() .setLabelCol("indexedLabel") .setFeaturesCol("featuresRaw") .setMaxBins(30) // add the params, unique to this classifier val paramGrid = new ParamGridBuilder() .addGrid(rf.numTrees, Array(5)) .addGrid(rf.maxDepth, Array(5)) .build() // Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages. val evaluator = new BinaryClassificationEvaluator() .setMetricName("areaUnderROC") .setLabelCol("indexedLabel") 

如果stream水线步骤分为变压器stream水线(VectorAssembler + StringIndexer)和第二个分类器stream水线,并且在两个stream水线之间放置不必要的列,则培训成功。 这意味着为了重复使用模型,必须在训练之后保存两个PipelineModel,并且必须引入中间预处理步骤。

 // Split indexers and forest in two Pipelines. val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain) // Transform data and drop all columns, except those needed for training val dfTrainT = prePipeline.transform(dfTrain) val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col)) val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*) val mainPipeline = new Pipeline().setStages(Array(rf)) val cv = new CrossValidator() .setEstimator(mainPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel] 

(imho)更清洁的解决scheme是将所有pipe道阶段合并成一个pipe道。

 val pipeline = new Pipeline() .setStages(Array(labelIndexer, assmbleFeatures, rf)) val cv = new CrossValidator() .setEstimator(pipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) // This will fail! val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel] 

但是,将所有PipelineStages放入一个Pipeline会导致以下exception,这可能是由于PR最终将解决的问题:

错误CodeGenerator:编译失败:org.codehaus.janino.JaninoRuntimeException:类org.apache.spark.sql.catalyst.expressions.GeneratedClass的常量池$ SpecificUnsafeProjection已经增长过JVM限制0xFFFF

原因是VectorAssembler有效地使DataFrame中的数据量增加了一倍(在本例中),因为没有可能丢弃不必要的列的变换器。 (查看火花pipe道vector汇编器放下其他列 )

对于golub数据集中的示例,以下预处理步骤是必要的:

 import org.apache.spark.sql.types.DoubleType import org.apache.spark.ml.classification.RandomForestClassifier import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature._ import org.apache.spark.sql._ import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100) // Those steps are necessary, otherwise training would fail either way val colsToDrop = df.columns.take(5000) val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*) // Split df in train and test sets val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3)) // Feature columns are columns except "TARGET" val featureColumns = dfTrain.columns.filter(col => col != "TARGET") 

由于我是Spark新手,我不确定解决此问题的最佳方法是什么。 你会build议…

  1. 创造一个新的变压器,哪些滴列,可以纳入pipe道?
  2. 拆分pipe道并引入中介步骤
  3. 还要别的吗? 🙂

还是我错过了什么重要的(pipe道步骤,公关等),可以解决这个问题?


编辑:

我实现了一个新的Transformer DroppingVectorAssembler ,它删除了不必要的列,但是,抛出了相同的exception。

除此之外,将spark.sql.codegen.wholeStage设置为false并不能解决问题。

janino错误是由于在优化程序进程中创build的常量variables的数量造成的。 JVM中允许的常量variables的最大限制是((2 ^ 16)-1)。 如果这个限制被超过,那么你得到Constant pool for class ... has grown past JVM limit of 0xFFFF

解决这个问题的JIRA是SPARK-18016 ,但是这个时候还在进行中。

VectorAssembler阶段,您的代码很可能会失败,因为在单个优化任务期间,必须针对数千列执行代码。

我为这个问题开发的解决方法是通过对列的子集进行处理来创build一个“向量向量”,然后将结果放在一起以创build一个奇异特征向量。 这可以防止任何单个优化任务超出JVM常量限制。 这不是优雅的,但是我已经在数据集上使用它,达到10k列的范围。

这个方法还允许你保持单一的pipe道,虽然它需要一些额外的步骤来使其工作(创build子向量)。 从子向量创build特征向量后,可以根据需要删除原始来源列。

示例代码:

 // IMPORT DEPENDENCIES import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column} import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.{Pipeline, PipelineModel} // Create first example dataframe val exampleDF = spark.createDataFrame(Seq( (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5), (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8), (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6), (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5), (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3), (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4) )).toDF("uid", "col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "colA", "colB", "colC", "colD", "colE", "colF", "colG", "colH", "colI", "colJ", "colK") // Create multiple column lists using the sliding method val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray // Create a vector assembler for each column list val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec") val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec") val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec") val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec") // Create a vector assembler using column list vectors as input val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features") // Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler)) // Fit and transform the data val featuresDF = pipeline.fit(exampleDF).transform(exampleDF) // Get the number of features in "features" vector val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs")) // Print number of features in "features vector" print(featureLength) 

(注意:创build列表的方法应该是以编程的方式完成的,但为了理解这个概念,我保持这个例子简单。)

你得到的janino错误是因为取决于function集,生成的代码变得更大。

我将这些步骤分成不同的pipe道,并删除不必要的function,保存像StringIndexerOneHotEncoder这样的中间模型,并在预测阶段加载它们,这也是有帮助的,因为对于需要预测的数据来说,转换会更快。

最后,在运行VectorAssembler阶段之后,您不需要保留特征列,因为它将特征转换为feature vectorlabel列,这就是运行预测所需的全部function。

在斯卡拉pipe道的例子与保存的中间步骤 – (旧的火花API)

另外,如果你使用1.6.0之类的老版本火花,你需要检查修补版本,例如2.1.1或2.2.0或1.6.4,否则即使大约有400个特征列,你也会碰到Janino错误。