diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index c478aea44ace8..2307c9de1203c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -18,16 +18,17 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Experimental +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeClassifierParams} import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{lit, col} /** * :: Experimental :: @@ -39,7 +40,7 @@ import org.apache.spark.sql.DataFrame @Experimental final class DecisionTreeClassifier(override val uid: String) extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] - with DecisionTreeParams with TreeClassifierParams { + with DecisionTreeParams with TreeClassifierParams with HasWeightCol { def this() = this(Identifiable.randomUID("dtc")) @@ -62,6 +63,15 @@ final class DecisionTreeClassifier(override val uid: String) override def setImpurity(value: String): this.type = super.setImpurity(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override def setSeed(value: Long): this.type = super.setSeed(value) override protected def train(dataset: DataFrame): DecisionTreeClassificationModel = { @@ -74,7 +84,11 @@ final class DecisionTreeClassifier(override val uid: String) " specified. See StringIndexer.") // TODO: Automatically index labels: SPARK-7126 } - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = getOldStrategy(categoricalFeatures, numClasses) val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), parentUID = Some(uid)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index bae329692a68d..544af9bd3d7c6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -18,17 +18,17 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Experimental +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, RandomForestParams, TreeClassifierParams, TreeEnsembleModel} import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{col, lit, udf} /** @@ -41,7 +41,7 @@ import org.apache.spark.sql.functions._ @Experimental final class RandomForestClassifier(override val uid: String) extends ProbabilisticClassifier[Vector, RandomForestClassifier, RandomForestClassificationModel] - with RandomForestParams with TreeClassifierParams { + with RandomForestParams with TreeClassifierParams with HasWeightCol{ def this() = this(Identifiable.randomUID("rfc")) @@ -79,6 +79,15 @@ final class RandomForestClassifier(override val uid: String) override def setFeatureSubsetStrategy(value: String): this.type = super.setFeatureSubsetStrategy(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override protected def train(dataset: DataFrame): RandomForestClassificationModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) @@ -89,7 +98,12 @@ final class RandomForestClassifier(override val uid: String) " specified. See StringIndexer.") // TODO: Automatically index labels: SPARK-7126 } - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity) val trees = diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 477030d9ea3ee..c549a018afec7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -17,18 +17,20 @@ package org.apache.spark.ml.regression + import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.{PredictionModel, Predictor} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeRegressorParams} import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{lit, col} /** * :: Experimental :: @@ -40,7 +42,7 @@ import org.apache.spark.sql.DataFrame @Experimental final class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Predictor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] - with DecisionTreeParams with TreeRegressorParams { + with DecisionTreeParams with TreeRegressorParams with HasWeightCol{ @Since("1.4.0") def this() = this(Identifiable.randomUID("dtr")) @@ -73,10 +75,23 @@ final class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val override def setSeed(value: Long): this.type = super.setSeed(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override protected def train(dataset: DataFrame): DecisionTreeRegressionModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = getOldStrategy(categoricalFeatures) val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), parentUID = Some(uid)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index 71e40b513ee0a..adbc8264d997b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -18,18 +18,18 @@ package org.apache.spark.ml.regression import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.{PredictionModel, Predictor} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, RandomForestParams, TreeEnsembleModel, TreeRegressorParams} import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{col, lit, udf} /** @@ -41,7 +41,7 @@ import org.apache.spark.sql.functions._ @Experimental final class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Predictor[Vector, RandomForestRegressor, RandomForestRegressionModel] - with RandomForestParams with TreeRegressorParams { + with RandomForestParams with TreeRegressorParams with HasWeightCol{ @Since("1.4.0") def this() = this(Identifiable.randomUID("rfr")) @@ -89,10 +89,23 @@ final class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val override def setFeatureSubsetStrategy(value: String): this.type = super.setFeatureSubsetStrategy(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override protected def train(dataset: DataFrame): RandomForestRegressionModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity) val trees = diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 4a3b12d1440b8..9ce0205588e66 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.spark.Logging import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.regression.DecisionTreeRegressionModel import org.apache.spark.ml.tree._ import org.apache.spark.mllib.linalg.{Vectors, Vector} @@ -43,11 +44,11 @@ private[ml] object RandomForest extends Logging { /** * Train a random forest. - * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param input Training data: RDD of [[org.apache.spark.ml.feature.Instance]] * @return an unweighted set of trees */ def run( - input: RDD[LabeledPoint], + input: RDD[Instance], strategy: OldStrategy, numTrees: Int, featureSubsetStrategy: String, @@ -60,9 +61,8 @@ private[ml] object RandomForest extends Logging { timer.start("init") - val retaggedInput = input.retag(classOf[LabeledPoint]) - val metadata = - DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) + val retaggedInput = input.retag(classOf[Instance]) + val metadata = buildWeightedMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) logDebug("algo = " + strategy.algo) logDebug("numTrees = " + numTrees) logDebug("seed = " + seed) @@ -87,8 +87,10 @@ private[ml] object RandomForest extends Logging { val withReplacement = numTrees > 1 - val baggedInput = BaggedPoint + val unadjustedBaggedInput = BaggedPoint .convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, withReplacement, seed) + + val baggedInput = reweightSubSampleWeights(unadjustedBaggedInput) .persist(StorageLevel.MEMORY_AND_DISK) // depth of the decision tree @@ -823,7 +825,7 @@ private[ml] object RandomForest extends Logging { * of size (numFeatures, numBins). */ protected[tree] def findSplits( - input: RDD[LabeledPoint], + input: RDD[Instance], metadata: DecisionTreeMetadata, seed : Long): Array[Array[Split]] = { @@ -844,7 +846,7 @@ private[ml] object RandomForest extends Logging { logDebug("fraction of data used for calculating quantiles = " + fraction) input.sample(withReplacement = false, fraction, new XORShiftRandom(seed).nextInt()).collect() } else { - new Array[LabeledPoint](0) + new Array[Instance](0) } val splits = new Array[Array[Split]](numFeatures) @@ -1171,4 +1173,28 @@ private[ml] object RandomForest extends Logging { } } + /** + * Inject the sample weight to sub-sample weights of the baggedPoints + */ + private[impl] def reweightSubSampleWeights( + baggedTreePoints: RDD[BaggedPoint[TreePoint]]): RDD[BaggedPoint[TreePoint]] = { + baggedTreePoints.map {bagged => + val treePoint = bagged.datum + val adjustedSubSampleWeights = bagged.subsampleWeights.map(w => w * treePoint.weight) + new BaggedPoint[TreePoint](treePoint, adjustedSubSampleWeights) + } + } + + /** + * A thin adaptor to [[org.apache.spark.mllib.tree.impl.DecisionTreeMetadata.buildMetadata]] + */ + private[impl] def buildWeightedMetadata( + input: RDD[Instance], + strategy: OldStrategy, + numTrees: Int, + featureSubsetStrategy: String): DecisionTreeMetadata = { + val unweightedInput = input.map {w => LabeledPoint(w.label, w.features)} + DecisionTreeMetadata.buildMetadata(unweightedInput, strategy, numTrees, featureSubsetStrategy) + } } + diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala index 9fa27e5e1f721..2fb04f850d638 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala @@ -17,8 +17,8 @@ package org.apache.spark.ml.tree.impl +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.tree.{ContinuousSplit, Split} -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.impl.DecisionTreeMetadata import org.apache.spark.rdd.RDD @@ -37,8 +37,10 @@ import org.apache.spark.rdd.RDD * @param label Label from LabeledPoint * @param binnedFeatures Binned feature values. * Same length as LabeledPoint.features, but values are bin indices. + * @param weight weight for this TreePoint */ -private[spark] class TreePoint(val label: Double, val binnedFeatures: Array[Int]) +private[spark] class TreePoint( + val label: Double, val binnedFeatures: Array[Int], val weight: Double) extends Serializable { } @@ -53,7 +55,7 @@ private[spark] object TreePoint { * @return TreePoint dataset representation */ def convertToTreeRDD( - input: RDD[LabeledPoint], + input: RDD[Instance], splits: Array[Array[Split]], metadata: DecisionTreeMetadata): RDD[TreePoint] = { // Construct arrays for featureArity for efficiency in the inner loop. @@ -83,18 +85,19 @@ private[spark] object TreePoint { * for categorical features. */ private def labeledPointToTreePoint( - labeledPoint: LabeledPoint, + instance: Instance, thresholds: Array[Array[Double]], featureArity: Array[Int]): TreePoint = { - val numFeatures = labeledPoint.features.size + val numFeatures = instance.features.size val arr = new Array[Int](numFeatures) var featureIndex = 0 while (featureIndex < numFeatures) { arr(featureIndex) = - findBin(featureIndex, labeledPoint, featureArity(featureIndex), thresholds(featureIndex)) + findBin(featureIndex, instance, featureArity(featureIndex), + thresholds(featureIndex)) featureIndex += 1 } - new TreePoint(labeledPoint.label, arr) + new TreePoint(instance.label, arr, instance.weight) } /** @@ -107,10 +110,10 @@ private[spark] object TreePoint { */ private def findBin( featureIndex: Int, - labeledPoint: LabeledPoint, + instance: Instance, featureArity: Int, thresholds: Array[Double]): Int = { - val featureValue = labeledPoint.features(featureIndex) + val featureValue = instance.features(featureIndex) if (featureArity == 0) { val idx = java.util.Arrays.binarySearch(thresholds, featureValue) @@ -126,7 +129,7 @@ private[spark] object TreePoint { s"DecisionTree given invalid data:" + s" Feature $featureIndex is categorical with values in {0,...,${featureArity - 1}," + s" but a data point gives it value $featureValue.\n" + - " Bad data point: " + labeledPoint.toString) + " Bad data point: " + instance.toString) } featureValue.toInt } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala index 92b8f84144ab0..5fb80c178d77d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala @@ -17,7 +17,10 @@ package org.apache.spark.ml.classification +import scala.util.Random + import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.{Instance, VectorIndexer, StringIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.LeafNode @@ -275,6 +278,63 @@ class DecisionTreeClassifierSuite extends SparkFunSuite with MLlibTestSparkConte val model = dt.fit(df) } + test("training with weighted data") { + val (dataset, weightedDataset) = { + val testData1 = TreeTests.generateNoisyData(5, 123) + val testData2 = TreeTests.generateNoisyData(5, 456) + + // Over-sample the 1st dataset twice. + val overSampledTestData1 = testData1.flatMap { + labeledPoint => Iterator(labeledPoint, labeledPoint) + } + + val rnd = new Random(8392) + val weightedTestData1 = testData1.flatMap { + case LabeledPoint(label: Double, features: Vector) => { + if (rnd.nextGaussian() > 0.0) { + Iterator( + Instance(label, 1.2, features), + Instance(label, 0.8, features), + Instance(0.0, 0.0, features)) + } else { + Iterator( + Instance(label, 0.3, features), + Instance(1, 0.0, features), + Instance(label, 1.1, features), + Instance(label, 0.6, features)) + } + } + } + val weightedTestData2 = testData2.map { + p: LabeledPoint => Instance(p.label, 1, p.features) + } + + (sqlContext.createDataFrame(sc.parallelize(overSampledTestData1 ++ testData2, 2)), + sqlContext.createDataFrame(sc.parallelize(weightedTestData1 ++ weightedTestData2, 2))) + } + + val labelIndexer = new StringIndexer() + .setInputCol("label") + .setOutputCol("indexedLabel") + .fit(dataset) + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val dt = new DecisionTreeClassifier() + .setLabelCol("indexedLabel") + .setFeaturesCol("indexedFeatures") + + val model1 = dt.fit(featureIndexer.transform(labelIndexer.transform(dataset))) + val model2 = dt.fit(featureIndexer.transform(labelIndexer.transform(weightedDataset)), + dt.weightCol->"weight") + + TreeTests.checkEqual(model1, model2) + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala index deb8ec771cb27..e15a5bfd05b70 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.ml.classification import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.{Instance, IndexToString, VectorIndexer, StringIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.LeafNode @@ -182,6 +184,53 @@ class RandomForestClassifierSuite extends SparkFunSuite with MLlibTestSparkConte assert(mostImportantFeature === 1) } + test("training with weighted data") { + val (dataset, testDataset) = { + val keyFeature = Vectors.dense(0, 1.0, 2, 1.2) + val data0 = Array.fill(20)(Instance(0, 0.1, keyFeature)) + val data1 = Array.fill(10)(Instance(1, 20.0, keyFeature)) + + val testData = Seq(Instance(0, 0.1, keyFeature)) + (sqlContext.createDataFrame(sc.parallelize(data0 ++ data1, 2)), + sqlContext.createDataFrame(sc.parallelize(testData, 2))) + } + + val labelIndexer = new StringIndexer() + .setInputCol("label") + .setOutputCol("indexedLabel") + .fit(dataset) + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val rf = new RandomForestClassifier() + .setLabelCol("indexedLabel") + .setFeaturesCol("indexedFeatures") + .setSeed(1) + + val labelConverter = new IndexToString() + .setInputCol("prediction") + .setOutputCol("predictedLabel") + .setLabels(labelIndexer.labels) + + val pipeline = new Pipeline() + .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) + + val model1 = pipeline.fit(dataset) + val model2 = pipeline.fit(dataset, rf.weightCol->"weight") + + val predDataset1 = model1.transform(testDataset) + val predDataset2 = model2.transform(testDataset) + + val prediction1 = predDataset1.select("predictedLabel").head().getString(0) + val prediction2 = predDataset2.select("predictedLabel").head().getString(0) + assert(prediction1.toDouble === 0.0) + assert(prediction2.toDouble === 1.0) + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala b/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala index 460849c79f04f..9c69b6a91d41c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala @@ -18,15 +18,16 @@ package org.apache.spark.ml.impl import scala.collection.JavaConverters._ +import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.api.java.JavaRDD import org.apache.spark.ml.attribute.{AttributeGroup, NominalAttribute, NumericAttribute} import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SQLContext, DataFrame} - +import org.apache.spark.sql.{DataFrame, SQLContext} private[ml] object TreeTests extends SparkFunSuite { @@ -142,4 +143,12 @@ private[ml] object TreeTests extends SparkFunSuite { val pred = parentImp.predict new InternalNode(pred, parentImp.calculate(), gain, left, right, split, parentImp) } + + def generateNoisyData(n: Int, seed: Long): Seq[LabeledPoint] = { + val rnd = new Random(seed) + Range(0, n).map { i => + LabeledPoint(rnd.nextInt(2), + Vectors.dense(rnd.nextInt(4), rnd.nextDouble(), rnd.nextInt(4), rnd.nextDouble())) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala index e0d5afa7a7e97..3e7ab84ef4282 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.ml.regression +import scala.util.Random + import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.{Instance, VectorIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.util.MLTestingUtils +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, DecisionTreeSuite => OldDecisionTreeSuite} @@ -27,7 +31,6 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame - class DecisionTreeRegressorSuite extends SparkFunSuite with MLlibTestSparkContext { import DecisionTreeRegressorSuite.compareAPIs @@ -73,6 +76,56 @@ class DecisionTreeRegressorSuite extends SparkFunSuite with MLlibTestSparkContex MLTestingUtils.checkCopy(model) } + test("training with weighted data") { + val (dataset, weightedDataset) = { + val testData1 = TreeTests.generateNoisyData(5, 123) + val testData2 = TreeTests.generateNoisyData(5, 456) + + // Over-sample the 1st dataset twice. + val overSampledTestData1 = testData1.flatMap { + labeledPoint => Iterator(labeledPoint, labeledPoint) + } + + val rnd = new Random(8392) + val weightedTestData1 = testData1.flatMap { + case LabeledPoint(label: Double, features: Vector) => { + if (rnd.nextGaussian() > 0.0) { + Iterator( + Instance(label, 1.2, features), + Instance(label, 0.8, features), + Instance(0.0, 0.0, features)) + } else { + Iterator( + Instance(label, 0.3, features), + Instance(1.0, 0.0, features), + Instance(label, 1.1, features), + Instance(label, 0.6, features)) + } + } + } + val weightedTestData2 = testData2.map { + p: LabeledPoint => Instance(p.label, 1, p.features) + } + + (sqlContext.createDataFrame(sc.parallelize(overSampledTestData1 ++ testData2, 2)), + sqlContext.createDataFrame(sc.parallelize(weightedTestData1 ++ weightedTestData2, 2))) + } + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val dt = new DecisionTreeRegressor() + .setFeaturesCol("indexedFeatures") + + val model1 = dt.fit(featureIndexer.transform(dataset)) + val model2 = dt.fit(featureIndexer.transform(weightedDataset), + dt.weightCol->"weight") + + TreeTests.checkEqual(model1, model2) + } ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index 7e751e4b553b6..3fa03cb1c7b33 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.{Instance, VectorIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.Vectors @@ -25,6 +27,7 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.{EnsembleTestHelper, RandomForest => OldRandomForest} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame @@ -101,6 +104,43 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex assert(mostImportantFeature === 1) } + test("training with weighted data") { + val (dataset, testDataset) = { + val keyFeature = Vectors.dense(0, 1.0, 2, 1.2) + val data0 = Array.fill(10)(Instance(10, 0.1, keyFeature)) + val data1 = Array.fill(10)(Instance(20, 20.0, keyFeature)) + + val testData = Seq(Instance(0, 1, keyFeature)) + (sqlContext.createDataFrame(sc.parallelize(data0 ++ data1, 2)), + sqlContext.createDataFrame(sc.parallelize(testData, 2))) + } + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val rf = new RandomForestRegressor() + .setFeaturesCol("indexedFeatures") + .setPredictionCol("predictedLabel") + .setSeed(1) + + val pipeline = new Pipeline() + .setStages(Array(featureIndexer, rf)) + + val model1 = pipeline.fit(dataset) + val model2 = pipeline.fit(dataset, rf.weightCol->"weight") + + val predDataset1 = model1.transform(testDataset) + val predDataset2 = model2.transform(testDataset) + + val prediction1 = predDataset1.select("predictedLabel").head().getDouble(0) + val prediction2 = predDataset2.select("predictedLabel").head().getDouble(0) + assert(prediction1 ~== 15.0 relTol 0.1) + assert(prediction2 ~== 20.0 relTol 0.1) + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dacef911e397e..6679f317a9a81 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -107,6 +107,19 @@ object MimaExcludes { "org.apache.spark.sql.SQLContext.setSession"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.sql.SQLContext.createSession") + ) ++ Seq( + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.RandomForest.run"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.RandomForest.findSplits"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.convertToTreeRDD"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.labeledPointToTreePoint"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.findBin"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.this") ) ++ Seq( ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.preferredNodeLocationData_="),