From 607eff0edfc10a1473fa9713a0500bf09f105c13 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 21 Apr 2015 21:44:44 -0700 Subject: [PATCH 1/3] [SPARK-6113] [ML] Small cleanups after original tree API PR This does a few clean-ups. With this PR, all spark.ml tree components have ```private[ml]``` constructors. CC: mengxr Author: Joseph K. Bradley Closes #5567 from jkbradley/dt-api-dt2 and squashes the following commits: 2263b5b [Joseph K. Bradley] Added note about tree example issue. bb9f610 [Joseph K. Bradley] Small cleanups after original tree API PR --- .../examples/ml/DecisionTreeExample.scala | 25 ++++++++++++++----- .../spark/ml/impl/tree/treeParams.scala | 4 +-- .../org/apache/spark/ml/tree/Split.scala | 7 +++--- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala index 921b396e799e..2cd515c89d3d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeExample.scala @@ -44,6 +44,13 @@ import org.apache.spark.sql.{SQLContext, DataFrame} * {{{ * ./bin/run-example ml.DecisionTreeExample [options] * }}} + * Note that Decision Trees can take a large amount of memory. If the run-example command above + * fails, try running via spark-submit and specifying the amount of memory as at least 1g. + * For local mode, run + * {{{ + * ./bin/spark-submit --class org.apache.spark.examples.ml.DecisionTreeExample --driver-memory 1g + * [examples JAR path] [options] + * }}} * If you use it as a template to create your own app, please use `spark-submit` to submit your app. */ object DecisionTreeExample { @@ -70,7 +77,7 @@ object DecisionTreeExample { val parser = new OptionParser[Params]("DecisionTreeExample") { head("DecisionTreeExample: an example decision tree app.") opt[String]("algo") - .text(s"algorithm (Classification, Regression), default: ${defaultParams.algo}") + .text(s"algorithm (classification, regression), default: ${defaultParams.algo}") .action((x, c) => c.copy(algo = x)) opt[Int]("maxDepth") .text(s"max depth of the tree, default: ${defaultParams.maxDepth}") @@ -222,18 +229,23 @@ object DecisionTreeExample { // (1) For classification, re-index classes. val labelColName = if (algo == "classification") "indexedLabel" else "label" if (algo == "classification") { - val labelIndexer = new StringIndexer().setInputCol("labelString").setOutputCol(labelColName) + val labelIndexer = new StringIndexer() + .setInputCol("labelString") + .setOutputCol(labelColName) stages += labelIndexer } // (2) Identify categorical features using VectorIndexer. // Features with more than maxCategories values will be treated as continuous. - val featuresIndexer = new VectorIndexer().setInputCol("features") - .setOutputCol("indexedFeatures").setMaxCategories(10) + val featuresIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(10) stages += featuresIndexer // (3) Learn DecisionTree val dt = algo match { case "classification" => - new DecisionTreeClassifier().setFeaturesCol("indexedFeatures") + new DecisionTreeClassifier() + .setFeaturesCol("indexedFeatures") .setLabelCol(labelColName) .setMaxDepth(params.maxDepth) .setMaxBins(params.maxBins) @@ -242,7 +254,8 @@ object DecisionTreeExample { .setCacheNodeIds(params.cacheNodeIds) .setCheckpointInterval(params.checkpointInterval) case "regression" => - new DecisionTreeRegressor().setFeaturesCol("indexedFeatures") + new DecisionTreeRegressor() + .setFeaturesCol("indexedFeatures") .setLabelCol(labelColName) .setMaxDepth(params.maxDepth) .setMaxBins(params.maxBins) diff --git a/mllib/src/main/scala/org/apache/spark/ml/impl/tree/treeParams.scala b/mllib/src/main/scala/org/apache/spark/ml/impl/tree/treeParams.scala index 6f4509f03d03..eb2609faef05 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/impl/tree/treeParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/impl/tree/treeParams.scala @@ -117,7 +117,7 @@ private[ml] trait DecisionTreeParams extends PredictorParams { def setMaxDepth(value: Int): this.type = { require(value >= 0, s"maxDepth parameter must be >= 0. Given bad value: $value") set(maxDepth, value) - this.asInstanceOf[this.type] + this } /** @group getParam */ @@ -283,7 +283,7 @@ private[ml] trait TreeRegressorParams extends Params { def getImpurity: String = getOrDefault(impurity) /** Convert new impurity to old impurity. */ - protected def getOldImpurity: OldImpurity = { + private[ml] def getOldImpurity: OldImpurity = { getImpurity match { case "variance" => OldVariance case _ => diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala index cb940f62990e..708c769087dd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/Split.scala @@ -38,7 +38,7 @@ sealed trait Split extends Serializable { private[tree] def toOld: OldSplit } -private[ml] object Split { +private[tree] object Split { def fromOld(oldSplit: OldSplit, categoricalFeatures: Map[Int, Int]): Split = { oldSplit.featureType match { @@ -58,7 +58,7 @@ private[ml] object Split { * left. Otherwise, it goes right. * @param numCategories Number of categories for this feature. */ -final class CategoricalSplit( +final class CategoricalSplit private[ml] ( override val featureIndex: Int, leftCategories: Array[Double], private val numCategories: Int) @@ -130,7 +130,8 @@ final class CategoricalSplit( * @param threshold If the feature value is <= this threshold, then the split goes left. * Otherwise, it goes right. */ -final class ContinuousSplit(override val featureIndex: Int, val threshold: Double) extends Split { +final class ContinuousSplit private[ml] (override val featureIndex: Int, val threshold: Double) + extends Split { override private[ml] def shouldGoLeft(features: Vector): Boolean = { features(featureIndex) <= threshold From 76cfe4dd8be221db956904aed2e623e167f7352e Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 21 Apr 2015 22:07:04 -0700 Subject: [PATCH 2/3] update impl --- .../org/apache/spark/ml/feature/IDF.scala | 35 +++++++++---------- .../apache/spark/ml/feature/IDFSuite.scala | 22 +++++++----- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 5a7974123a0a..66046d3fd0d5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -20,26 +20,28 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml._ import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.mllib.feature import org.apache.spark.mllib.linalg.{Vector, VectorUDT} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType /** * Params for [[IDF]] and [[IDFModel]]. */ -private[feature] trait IDFParams extends Params with HasInputCol with HasOutputCol { +private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol { /** * The minimum of documents in which a term should appear. * @group param */ - val minDocFreq = new IntParam( - this, "minDocFreq", "minimum of documents in which a term should appear for filtering", Some(0)) + final val minDocFreq = new IntParam( + this, "minDocFreq", "minimum of documents in which a term should appear for filtering") /** @group getParam */ - def getMinDocFreq: Int = get(minDocFreq) + def getMinDocFreq: Int = getOrDefault(minDocFreq) /** @group setParam */ def setMinDocFreq(value: Int): this.type = set(minDocFreq, value) @@ -48,14 +50,9 @@ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputC * Validate and transform the input schema. */ protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = this.paramMap ++ paramMap - val inputType = schema(map(inputCol)).dataType - require(inputType.isInstanceOf[VectorUDT], - s"Input column ${map(inputCol)} must be a vector column") - require(!schema.fieldNames.contains(map(outputCol)), - s"Output column ${map(outputCol)} already exists.") - val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) - StructType(outputFields) + val map = extractParamMap(paramMap) + SchemaUtils.checkColumnType(schema, map(inputCol), new VectorUDT) + SchemaUtils.appendColumn(schema, map(outputCol), new VectorUDT) } } @@ -64,7 +61,7 @@ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputC * Compute the Inverse Document Frequency (IDF) given a collection of documents. */ @AlphaComponent -class IDF extends Estimator[IDFModel] with IDFParams { +final class IDF extends Estimator[IDFModel] with IDFBase { /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -74,7 +71,7 @@ class IDF extends Estimator[IDFModel] with IDFParams { override def fit(dataset: DataFrame, paramMap: ParamMap): IDFModel = { transformSchema(dataset.schema, paramMap, logging = true) - val map = this.paramMap ++ paramMap + val map = extractParamMap(paramMap) val input = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } val idf = new feature.IDF(getMinDocFreq).fit(input) val model = new IDFModel(this, map, idf) @@ -96,7 +93,7 @@ class IDFModel private[ml] ( override val parent: IDF, override val fittingParamMap: ParamMap, idfModel: feature.IDFModel) - extends Model[IDFModel] with IDFParams { + extends Model[IDFModel] with IDFBase { /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -106,9 +103,9 @@ class IDFModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) - val map = this.paramMap ++ paramMap - val idf: Vector => Vector = (vec) => idfModel.transform(vec) - dataset.withColumn(map(outputCol), callUDF(idf, new VectorUDT, col(map(inputCol)))) + val map = extractParamMap(paramMap) + val idf = udf { vec: Vector => idfModel.transform(vec) } + dataset.withColumn(map(outputCol), idf(col(map(inputCol)))) } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index e4bec4383a06..de3c191ba6fb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite -import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} @@ -56,15 +56,17 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { val data = Array( Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), Vectors.dense(0.0, 1.0, 2.0, 3.0), - Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) - ) - val numOfData = data.size + Vectors.sparse(numOfFeatures, Array(1), Array(1.0))) + val numOfData = data.length val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - val idfModel = new IDF().setInputCol("features").setOutputCol("idf_value").fit(dataFrame) + val idfModel = new IDF() + .setInputCol("features") + .setOutputCol("idf_value") + .fit(dataFrame) val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => math.log((numOfData + 1.0) / (x + 1.0)) @@ -80,15 +82,17 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { val data = Array( Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), Vectors.dense(0.0, 1.0, 2.0, 3.0), - Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) - ) - val numOfData = data.size + Vectors.sparse(numOfFeatures, Array(1), Array(1.0))) + val numOfData = data.length val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - val idfModel = new IDF().setInputCol("features").setOutputCol("idf_value").setMinDocFreq(1) + val idfModel = new IDF() + .setInputCol("features") + .setOutputCol("idf_value") + .setMinDocFreq(1) .fit(dataFrame) val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => From e0af0da2e1b0f93b3383c29488998d75a0d781f9 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Tue, 21 Apr 2015 22:31:24 -0700 Subject: [PATCH 3/3] update tests --- .../apache/spark/ml/feature/IDFSuite.scala | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index de3c191ba6fb..4f7e062bb8e9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -26,6 +26,13 @@ import org.apache.spark.sql.{DataFrame, Row, SQLContext} class IDFSuite extends FunSuite with MLlibTestSparkContext { + @transient var sqlContext: SQLContext = _ + + override def beforeAll(): Unit = { + super.beforeAll() + sqlContext = new SQLContext(sc) + } + def getResultFromDF(result: DataFrame): Array[Vector] = { result.select("idf_value").collect().map { case Row(features: Vector) => features @@ -58,23 +65,22 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(numOfFeatures, Array(1), Array(1.0))) val numOfData = data.length + val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => + math.log((numOfData + 1.0) / (x + 1.0)) + }) + val expected = getResultFromVector(data, idf) - val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ - val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") + val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected") val idfModel = new IDF() .setInputCol("features") .setOutputCol("idf_value") - .fit(dataFrame) + .fit(df) - val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => - math.log((numOfData + 1.0) / (x + 1.0)) - }) - - assertValues( - getResultFromDF(idfModel.transform(dataFrame)), - getResultFromVector(data, expectedModel)) + idfModel.transform(df).select("idf_value", "expected").collect().foreach { + case Row(x: Vector, y: Vector) => + assert(x ~== y absTol 1e-5) + } } test("Normalization with setter") {