From 9d1e1b372ef14d0085bea193d0b2dce8cfba5f78 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Tue, 6 Oct 2015 19:55:35 -0700 Subject: [PATCH 1/3] Add weight support for DecisionTree and RandomForest --- .../DecisionTreeClassifier.scala | 24 ++++-- .../RandomForestClassifier.scala | 26 ++++-- .../ml/regression/DecisionTreeRegressor.scala | 23 ++++- .../ml/regression/RandomForestRegressor.scala | 25 ++++-- .../spark/ml/tree/impl/RandomForest.scala | 83 +++++++++++++++++-- .../apache/spark/ml/tree/impl/TreePoint.scala | 22 ++--- .../DecisionTreeClassifierSuite.scala | 61 ++++++++++++++ .../RandomForestClassifierSuite.scala | 50 +++++++++++ .../org/apache/spark/ml/impl/TreeTests.scala | 12 +++ .../DecisionTreeRegressorSuite.scala | 56 ++++++++++++- .../RandomForestRegressorSuite.scala | 41 +++++++++ project/MimaExcludes.scala | 13 +++ 12 files changed, 396 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index c478aea44ace8..2307c9de1203c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala @@ -18,16 +18,17 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Experimental +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param.ParamMap +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeClassifierParams} import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{lit, col} /** * :: Experimental :: @@ -39,7 +40,7 @@ import org.apache.spark.sql.DataFrame @Experimental final class DecisionTreeClassifier(override val uid: String) extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel] - with DecisionTreeParams with TreeClassifierParams { + with DecisionTreeParams with TreeClassifierParams with HasWeightCol { def this() = this(Identifiable.randomUID("dtc")) @@ -62,6 +63,15 @@ final class DecisionTreeClassifier(override val uid: String) override def setImpurity(value: String): this.type = super.setImpurity(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override def setSeed(value: Long): this.type = super.setSeed(value) override protected def train(dataset: DataFrame): DecisionTreeClassificationModel = { @@ -74,7 +84,11 @@ final class DecisionTreeClassifier(override val uid: String) " specified. See StringIndexer.") // TODO: Automatically index labels: SPARK-7126 } - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = getOldStrategy(categoricalFeatures, numClasses) val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), parentUID = Some(uid)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index bae329692a68d..544af9bd3d7c6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -18,17 +18,17 @@ package org.apache.spark.ml.classification import org.apache.spark.annotation.Experimental +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, RandomForestParams, TreeClassifierParams, TreeEnsembleModel} import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{col, lit, udf} /** @@ -41,7 +41,7 @@ import org.apache.spark.sql.functions._ @Experimental final class RandomForestClassifier(override val uid: String) extends ProbabilisticClassifier[Vector, RandomForestClassifier, RandomForestClassificationModel] - with RandomForestParams with TreeClassifierParams { + with RandomForestParams with TreeClassifierParams with HasWeightCol{ def this() = this(Identifiable.randomUID("rfc")) @@ -79,6 +79,15 @@ final class RandomForestClassifier(override val uid: String) override def setFeatureSubsetStrategy(value: String): this.type = super.setFeatureSubsetStrategy(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override protected def train(dataset: DataFrame): RandomForestClassificationModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) @@ -89,7 +98,12 @@ final class RandomForestClassifier(override val uid: String) " specified. See StringIndexer.") // TODO: Automatically index labels: SPARK-7126 } - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity) val trees = diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 477030d9ea3ee..8151b7d27af92 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -18,6 +18,8 @@ package org.apache.spark.ml.regression import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.{PredictionModel, Predictor} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, TreeRegressorParams} @@ -27,8 +29,8 @@ import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{lit, col} /** * :: Experimental :: @@ -40,7 +42,7 @@ import org.apache.spark.sql.DataFrame @Experimental final class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Predictor[Vector, DecisionTreeRegressor, DecisionTreeRegressionModel] - with DecisionTreeParams with TreeRegressorParams { + with DecisionTreeParams with TreeRegressorParams with HasWeightCol{ @Since("1.4.0") def this() = this(Identifiable.randomUID("dtr")) @@ -73,10 +75,23 @@ final class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val override def setSeed(value: Long): this.type = super.setSeed(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override protected def train(dataset: DataFrame): DecisionTreeRegressionModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = getOldStrategy(categoricalFeatures) val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all", seed = $(seed), parentUID = Some(uid)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index 71e40b513ee0a..adbc8264d997b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -18,18 +18,18 @@ package org.apache.spark.ml.regression import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.param.shared.HasWeightCol import org.apache.spark.ml.{PredictionModel, Predictor} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree.{DecisionTreeModel, RandomForestParams, TreeEnsembleModel, TreeRegressorParams} import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions._ +import org.apache.spark.sql.{Row, DataFrame} +import org.apache.spark.sql.functions.{col, lit, udf} /** @@ -41,7 +41,7 @@ import org.apache.spark.sql.functions._ @Experimental final class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) extends Predictor[Vector, RandomForestRegressor, RandomForestRegressionModel] - with RandomForestParams with TreeRegressorParams { + with RandomForestParams with TreeRegressorParams with HasWeightCol{ @Since("1.4.0") def this() = this(Identifiable.randomUID("rfr")) @@ -89,10 +89,23 @@ final class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val override def setFeatureSubsetStrategy(value: String): this.type = super.setFeatureSubsetStrategy(value) + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") + override protected def train(dataset: DataFrame): RandomForestRegressionModel = { val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) - val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) + val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + val oldDataset = dataset.select(col($(labelCol)), w, col($(featuresCol))).map { + case Row(label: Double, weight: Double, features: Vector) => + Instance(label, weight, features) + } val strategy = super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity) val trees = diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 4a3b12d1440b8..7350039dff06c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.apache.spark.Logging import org.apache.spark.ml.classification.DecisionTreeClassificationModel +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.regression.DecisionTreeRegressionModel import org.apache.spark.ml.tree._ import org.apache.spark.mllib.linalg.{Vectors, Vector} @@ -43,11 +44,11 @@ private[ml] object RandomForest extends Logging { /** * Train a random forest. - * @param input Training data: RDD of [[org.apache.spark.mllib.regression.LabeledPoint]] + * @param input Training data: RDD of [[org.apache.spark.ml.feature.Instance]] * @return an unweighted set of trees */ def run( - input: RDD[LabeledPoint], + input: RDD[Instance], strategy: OldStrategy, numTrees: Int, featureSubsetStrategy: String, @@ -60,9 +61,8 @@ private[ml] object RandomForest extends Logging { timer.start("init") - val retaggedInput = input.retag(classOf[LabeledPoint]) - val metadata = - DecisionTreeMetadata.buildMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) + val retaggedInput = input.retag(classOf[Instance]) + val metadata = buildWeightedMetadata(retaggedInput, strategy, numTrees, featureSubsetStrategy) logDebug("algo = " + strategy.algo) logDebug("numTrees = " + numTrees) logDebug("seed = " + seed) @@ -87,8 +87,10 @@ private[ml] object RandomForest extends Logging { val withReplacement = numTrees > 1 - val baggedInput = BaggedPoint + val unadjustedBaggedInput = BaggedPoint .convertToBaggedRDD(treeInput, strategy.subsamplingRate, numTrees, withReplacement, seed) + + val baggedInput = reweightSubSampleWeights(unadjustedBaggedInput) .persist(StorageLevel.MEMORY_AND_DISK) // depth of the decision tree @@ -205,6 +207,47 @@ private[ml] object RandomForest extends Logging { } } + /** + * Get the node index corresponding to this data point. + * This function mimics prediction, passing an example from the root node down to a leaf + * or unsplit node; that node's index is returned. + * + * @param node Node in tree from which to classify the given data point. + * @param binnedFeatures Binned feature vector for data point. + * @param splits possible splits for all features, indexed (numFeatures)(numSplits) + * @return Leaf index if the data point reaches a leaf. + * Otherwise, last node reachable in tree matching this example. + * Note: This is the global node index, i.e., the index used in the tree. + * This index is different from the index used during training a particular + * group of nodes on one call to [[findBestSplits()]]. + */ + private def predictNodeIndex( + node: LearningNode, + binnedFeatures: Array[Int], + splits: Array[Array[Split]]): Int = { + if (node.isLeaf || node.split.isEmpty) { + node.id + } else { + val split = node.split.get + val featureIndex = split.featureIndex + val splitLeft = split.shouldGoLeft(binnedFeatures(featureIndex), splits(featureIndex)) + if (node.leftChild.isEmpty) { + // Not yet split. Return index from next layer of nodes to train + if (splitLeft) { + LearningNode.leftChildIndex(node.id) + } else { + LearningNode.rightChildIndex(node.id) + } + } else { + if (splitLeft) { + predictNodeIndex(node.leftChild.get, binnedFeatures, splits) + } else { + predictNodeIndex(node.rightChild.get, binnedFeatures, splits) + } + } + } + } + /** * Helper for binSeqOp, for data which can contain a mix of ordered and unordered features. * @@ -823,7 +866,7 @@ private[ml] object RandomForest extends Logging { * of size (numFeatures, numBins). */ protected[tree] def findSplits( - input: RDD[LabeledPoint], + input: RDD[Instance], metadata: DecisionTreeMetadata, seed : Long): Array[Array[Split]] = { @@ -844,7 +887,7 @@ private[ml] object RandomForest extends Logging { logDebug("fraction of data used for calculating quantiles = " + fraction) input.sample(withReplacement = false, fraction, new XORShiftRandom(seed).nextInt()).collect() } else { - new Array[LabeledPoint](0) + new Array[Instance](0) } val splits = new Array[Array[Split]](numFeatures) @@ -1171,4 +1214,28 @@ private[ml] object RandomForest extends Logging { } } + /** + * Inject the sample weight to sub-sample weights of the baggedPoints + */ + private[impl] def reweightSubSampleWeights( + baggedTreePoints: RDD[BaggedPoint[TreePoint]]): RDD[BaggedPoint[TreePoint]] = { + baggedTreePoints.map {bagged => + val treePoint = bagged.datum + val adjustedSubSampleWeights = bagged.subsampleWeights.map(w => w * treePoint.weight) + new BaggedPoint[TreePoint](treePoint, adjustedSubSampleWeights) + } + } + + /** + * A thin adaptor to [[org.apache.spark.mllib.tree.impl.DecisionTreeMetadata.buildMetadata]] + */ + private[impl] def buildWeightedMetadata( + input: RDD[Instance], + strategy: OldStrategy, + numTrees: Int, + featureSubsetStrategy: String): DecisionTreeMetadata = { + val unweightedInput = input.map {w => LabeledPoint(w.label, w.features)} + DecisionTreeMetadata.buildMetadata(unweightedInput, strategy, numTrees, featureSubsetStrategy) + } } + diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala index 9fa27e5e1f721..8a3192f881201 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala @@ -18,7 +18,6 @@ package org.apache.spark.ml.tree.impl import org.apache.spark.ml.tree.{ContinuousSplit, Split} -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.impl.DecisionTreeMetadata import org.apache.spark.rdd.RDD @@ -37,8 +36,10 @@ import org.apache.spark.rdd.RDD * @param label Label from LabeledPoint * @param binnedFeatures Binned feature values. * Same length as LabeledPoint.features, but values are bin indices. + * @param weight weight for this TreePoint */ -private[spark] class TreePoint(val label: Double, val binnedFeatures: Array[Int]) +private[spark] class TreePoint( + val label: Double, val binnedFeatures: Array[Int], val weight: Double) extends Serializable { } @@ -53,7 +54,7 @@ private[spark] object TreePoint { * @return TreePoint dataset representation */ def convertToTreeRDD( - input: RDD[LabeledPoint], + input: RDD[WeightedLabeledPoint], splits: Array[Array[Split]], metadata: DecisionTreeMetadata): RDD[TreePoint] = { // Construct arrays for featureArity for efficiency in the inner loop. @@ -83,18 +84,19 @@ private[spark] object TreePoint { * for categorical features. */ private def labeledPointToTreePoint( - labeledPoint: LabeledPoint, + weightedLabeledPoint: WeightedLabeledPoint, thresholds: Array[Array[Double]], featureArity: Array[Int]): TreePoint = { - val numFeatures = labeledPoint.features.size + val numFeatures = weightedLabeledPoint.features.size val arr = new Array[Int](numFeatures) var featureIndex = 0 while (featureIndex < numFeatures) { arr(featureIndex) = - findBin(featureIndex, labeledPoint, featureArity(featureIndex), thresholds(featureIndex)) + findBin(featureIndex, weightedLabeledPoint, featureArity(featureIndex), + thresholds(featureIndex)) featureIndex += 1 } - new TreePoint(labeledPoint.label, arr) + new TreePoint(weightedLabeledPoint.label, arr, weightedLabeledPoint.weight) } /** @@ -107,10 +109,10 @@ private[spark] object TreePoint { */ private def findBin( featureIndex: Int, - labeledPoint: LabeledPoint, + weightedLabeledPoint: WeightedLabeledPoint, featureArity: Int, thresholds: Array[Double]): Int = { - val featureValue = labeledPoint.features(featureIndex) + val featureValue = weightedLabeledPoint.features(featureIndex) if (featureArity == 0) { val idx = java.util.Arrays.binarySearch(thresholds, featureValue) @@ -126,7 +128,7 @@ private[spark] object TreePoint { s"DecisionTree given invalid data:" + s" Feature $featureIndex is categorical with values in {0,...,${featureArity - 1}," + s" but a data point gives it value $featureValue.\n" + - " Bad data point: " + labeledPoint.toString) + " Bad data point: " + weightedLabeledPoint.toString) } featureValue.toInt } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala index 92b8f84144ab0..f8de9349eaf68 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala @@ -17,10 +17,14 @@ package org.apache.spark.ml.classification +import scala.util.Random + import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.{VectorIndexer, StringIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.LeafNode +import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint @@ -275,6 +279,63 @@ class DecisionTreeClassifierSuite extends SparkFunSuite with MLlibTestSparkConte val model = dt.fit(df) } + test("training with weighted data") { + val (dataset, weightedDataset) = { + val testData1 = TreeTests.generateNoisyData(5, 123) + val testData2 = TreeTests.generateNoisyData(5, 456) + + // Over-sample the 1st dataset twice. + val overSampledTestData1 = testData1.flatMap { + labeledPoint => Iterator(labeledPoint, labeledPoint) + } + + val rnd = new Random(8392) + val weightedTestData1 = testData1.flatMap { + case LabeledPoint(label: Double, features: Vector) => { + if (rnd.nextGaussian() > 0.0) { + Iterator( + WeightedLabeledPoint(label, 1.2, features), + WeightedLabeledPoint(label, 0.8, features), + WeightedLabeledPoint(0.0, 0.0, features)) + } else { + Iterator( + WeightedLabeledPoint(label, 0.3, features), + WeightedLabeledPoint(1, 0.0, features), + WeightedLabeledPoint(label, 1.1, features), + WeightedLabeledPoint(label, 0.6, features)) + } + } + } + val weightedTestData2 = testData2.map { + p: LabeledPoint => WeightedLabeledPoint(p.label, 1, p.features) + } + + (sqlContext.createDataFrame(sc.parallelize(overSampledTestData1 ++ testData2, 2)), + sqlContext.createDataFrame(sc.parallelize(weightedTestData1 ++ weightedTestData2, 2))) + } + + val labelIndexer = new StringIndexer() + .setInputCol("label") + .setOutputCol("indexedLabel") + .fit(dataset) + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val dt = new DecisionTreeClassifier() + .setLabelCol("indexedLabel") + .setFeaturesCol("indexedFeatures") + + val model1 = dt.fit(featureIndexer.transform(labelIndexer.transform(dataset))) + val model2 = dt.fit(featureIndexer.transform(labelIndexer.transform(weightedDataset)), + dt.weightCol->"weight") + + TreeTests.checkEqual(model1, model2) + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala index deb8ec771cb27..5a2298823eaa7 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala @@ -18,9 +18,12 @@ package org.apache.spark.ml.classification import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.{IndexToString, VectorIndexer, StringIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.LeafNode +import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint @@ -182,6 +185,53 @@ class RandomForestClassifierSuite extends SparkFunSuite with MLlibTestSparkConte assert(mostImportantFeature === 1) } + test("training with weighted data") { + val (dataset, testDataset) = { + val keyFeature = Vectors.dense(0, 1.0, 2, 1.2) + val data0 = Array.fill(20)(WeightedLabeledPoint(0, 0.1, keyFeature)) + val data1 = Array.fill(10)(WeightedLabeledPoint(1, 20.0, keyFeature)) + + val testData = Seq(WeightedLabeledPoint(0, 0.1, keyFeature)) + (sqlContext.createDataFrame(sc.parallelize(data0 ++ data1, 2)), + sqlContext.createDataFrame(sc.parallelize(testData, 2))) + } + + val labelIndexer = new StringIndexer() + .setInputCol("label") + .setOutputCol("indexedLabel") + .fit(dataset) + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val rf = new RandomForestClassifier() + .setLabelCol("indexedLabel") + .setFeaturesCol("indexedFeatures") + .setSeed(1) + + val labelConverter = new IndexToString() + .setInputCol("prediction") + .setOutputCol("predictedLabel") + .setLabels(labelIndexer.labels) + + val pipeline = new Pipeline() + .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter)) + + val model1 = pipeline.fit(dataset) + val model2 = pipeline.fit(dataset, rf.weightCol->"weight") + + val predDataset1 = model1.transform(testDataset) + val predDataset2 = model2.transform(testDataset) + + val prediction1 = predDataset1.select("predictedLabel").head().getString(0) + val prediction2 = predDataset2.select("predictedLabel").head().getString(0) + assert(prediction1.toDouble === 0.0) + assert(prediction2.toDouble === 1.0) + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala b/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala index 460849c79f04f..e85fcd87c9329 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala @@ -17,6 +17,8 @@ package org.apache.spark.ml.impl +import org.apache.spark.mllib.linalg.Vectors + import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite @@ -27,6 +29,8 @@ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD import org.apache.spark.sql.{SQLContext, DataFrame} +import scala.util.Random + private[ml] object TreeTests extends SparkFunSuite { @@ -142,4 +146,12 @@ private[ml] object TreeTests extends SparkFunSuite { val pred = parentImp.predict new InternalNode(pred, parentImp.calculate(), gain, left, right, split, parentImp) } + + def generateNoisyData(n: Int, seed: Long): Seq[LabeledPoint] = { + val rnd = new Random(seed) + Range(0, n).map { i => + LabeledPoint(rnd.nextInt(2), + Vectors.dense(rnd.nextInt(4), rnd.nextDouble(), rnd.nextInt(4), rnd.nextDouble())) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala index e0d5afa7a7e97..9b6a874b27c9e 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala @@ -17,9 +17,14 @@ package org.apache.spark.ml.regression +import scala.util.Random + import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.impl.TreeTests +import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils +import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, DecisionTreeSuite => OldDecisionTreeSuite} @@ -27,7 +32,6 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame - class DecisionTreeRegressorSuite extends SparkFunSuite with MLlibTestSparkContext { import DecisionTreeRegressorSuite.compareAPIs @@ -73,6 +77,56 @@ class DecisionTreeRegressorSuite extends SparkFunSuite with MLlibTestSparkContex MLTestingUtils.checkCopy(model) } + test("training with weighted data") { + val (dataset, weightedDataset) = { + val testData1 = TreeTests.generateNoisyData(5, 123) + val testData2 = TreeTests.generateNoisyData(5, 456) + + // Over-sample the 1st dataset twice. + val overSampledTestData1 = testData1.flatMap { + labeledPoint => Iterator(labeledPoint, labeledPoint) + } + + val rnd = new Random(8392) + val weightedTestData1 = testData1.flatMap { + case LabeledPoint(label: Double, features: Vector) => { + if (rnd.nextGaussian() > 0.0) { + Iterator( + WeightedLabeledPoint(label, 1.2, features), + WeightedLabeledPoint(label, 0.8, features), + WeightedLabeledPoint(0.0, 0.0, features)) + } else { + Iterator( + WeightedLabeledPoint(label, 0.3, features), + WeightedLabeledPoint(1.0, 0.0, features), + WeightedLabeledPoint(label, 1.1, features), + WeightedLabeledPoint(label, 0.6, features)) + } + } + } + val weightedTestData2 = testData2.map { + p: LabeledPoint => WeightedLabeledPoint(p.label, 1, p.features) + } + + (sqlContext.createDataFrame(sc.parallelize(overSampledTestData1 ++ testData2, 2)), + sqlContext.createDataFrame(sc.parallelize(weightedTestData1 ++ weightedTestData2, 2))) + } + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val dt = new DecisionTreeRegressor() + .setFeaturesCol("indexedFeatures") + + val model1 = dt.fit(featureIndexer.transform(dataset)) + val model2 = dt.fit(featureIndexer.transform(weightedDataset), + dt.weightCol->"weight") + + TreeTests.checkEqual(model1, model2) + } ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index 7e751e4b553b6..079bd3b423f12 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -18,13 +18,17 @@ package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.feature.VectorIndexer import org.apache.spark.ml.impl.TreeTests +import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.{EnsembleTestHelper, RandomForest => OldRandomForest} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame @@ -101,6 +105,43 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex assert(mostImportantFeature === 1) } + test("training with weighted data") { + val (dataset, testDataset) = { + val keyFeature = Vectors.dense(0, 1.0, 2, 1.2) + val data0 = Array.fill(10)(WeightedLabeledPoint(10, 0.1, keyFeature)) + val data1 = Array.fill(10)(WeightedLabeledPoint(20, 20.0, keyFeature)) + + val testData = Seq(WeightedLabeledPoint(0, 1, keyFeature)) + (sqlContext.createDataFrame(sc.parallelize(data0 ++ data1, 2)), + sqlContext.createDataFrame(sc.parallelize(testData, 2))) + } + + val featureIndexer = new VectorIndexer() + .setInputCol("features") + .setOutputCol("indexedFeatures") + .setMaxCategories(4) + .fit(dataset) + + val rf = new RandomForestRegressor() + .setFeaturesCol("indexedFeatures") + .setPredictionCol("predictedLabel") + .setSeed(1) + + val pipeline = new Pipeline() + .setStages(Array(featureIndexer, rf)) + + val model1 = pipeline.fit(dataset) + val model2 = pipeline.fit(dataset, rf.weightCol->"weight") + + val predDataset1 = model1.transform(testDataset) + val predDataset2 = model2.transform(testDataset) + + val prediction1 = predDataset1.select("predictedLabel").head().getDouble(0) + val prediction2 = predDataset2.select("predictedLabel").head().getDouble(0) + assert(prediction1 ~== 15.0 relTol 0.1) + assert(prediction2 ~== 20.0 relTol 0.1) + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index dacef911e397e..6679f317a9a81 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -107,6 +107,19 @@ object MimaExcludes { "org.apache.spark.sql.SQLContext.setSession"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.sql.SQLContext.createSession") + ) ++ Seq( + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.RandomForest.run"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.RandomForest.findSplits"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.convertToTreeRDD"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.labeledPointToTreePoint"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.findBin"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.ml.tree.impl.TreePoint.this") ) ++ Seq( ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.SparkContext.preferredNodeLocationData_="), From 09d3d4543397e751c3c8cdacef50c678cc6e1401 Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Thu, 8 Oct 2015 23:45:29 -0700 Subject: [PATCH 2/3] Refactor WeightedLabeledPoint to ml.features.Instance --- .../ml/regression/DecisionTreeRegressor.scala | 2 +- .../apache/spark/ml/tree/impl/TreePoint.scala | 17 +++++++++-------- .../DecisionTreeClassifierSuite.scala | 19 +++++++++---------- .../RandomForestClassifierSuite.scala | 9 ++++----- .../DecisionTreeRegressorSuite.scala | 19 +++++++++---------- .../RandomForestRegressorSuite.scala | 9 ++++----- 6 files changed, 36 insertions(+), 39 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 8151b7d27af92..c549a018afec7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.regression + import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.param.shared.HasWeightCol @@ -26,7 +27,6 @@ import org.apache.spark.ml.tree.{DecisionTreeModel, DecisionTreeParams, Node, Tr import org.apache.spark.ml.tree.impl.RandomForest import org.apache.spark.ml.util.{Identifiable, MetadataUtils} import org.apache.spark.mllib.linalg.Vector -import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy} import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel} import org.apache.spark.sql.{Row, DataFrame} diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala index 8a3192f881201..2fb04f850d638 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/TreePoint.scala @@ -17,6 +17,7 @@ package org.apache.spark.ml.tree.impl +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.tree.{ContinuousSplit, Split} import org.apache.spark.mllib.tree.impl.DecisionTreeMetadata import org.apache.spark.rdd.RDD @@ -54,7 +55,7 @@ private[spark] object TreePoint { * @return TreePoint dataset representation */ def convertToTreeRDD( - input: RDD[WeightedLabeledPoint], + input: RDD[Instance], splits: Array[Array[Split]], metadata: DecisionTreeMetadata): RDD[TreePoint] = { // Construct arrays for featureArity for efficiency in the inner loop. @@ -84,19 +85,19 @@ private[spark] object TreePoint { * for categorical features. */ private def labeledPointToTreePoint( - weightedLabeledPoint: WeightedLabeledPoint, + instance: Instance, thresholds: Array[Array[Double]], featureArity: Array[Int]): TreePoint = { - val numFeatures = weightedLabeledPoint.features.size + val numFeatures = instance.features.size val arr = new Array[Int](numFeatures) var featureIndex = 0 while (featureIndex < numFeatures) { arr(featureIndex) = - findBin(featureIndex, weightedLabeledPoint, featureArity(featureIndex), + findBin(featureIndex, instance, featureArity(featureIndex), thresholds(featureIndex)) featureIndex += 1 } - new TreePoint(weightedLabeledPoint.label, arr, weightedLabeledPoint.weight) + new TreePoint(instance.label, arr, instance.weight) } /** @@ -109,10 +110,10 @@ private[spark] object TreePoint { */ private def findBin( featureIndex: Int, - weightedLabeledPoint: WeightedLabeledPoint, + instance: Instance, featureArity: Int, thresholds: Array[Double]): Int = { - val featureValue = weightedLabeledPoint.features(featureIndex) + val featureValue = instance.features(featureIndex) if (featureArity == 0) { val idx = java.util.Arrays.binarySearch(thresholds, featureValue) @@ -128,7 +129,7 @@ private[spark] object TreePoint { s"DecisionTree given invalid data:" + s" Feature $featureIndex is categorical with values in {0,...,${featureArity - 1}," + s" but a data point gives it value $featureValue.\n" + - " Bad data point: " + weightedLabeledPoint.toString) + " Bad data point: " + instance.toString) } featureValue.toInt } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala index f8de9349eaf68..5fb80c178d77d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala @@ -20,11 +20,10 @@ package org.apache.spark.ml.classification import scala.util.Random import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.{VectorIndexer, StringIndexer} +import org.apache.spark.ml.feature.{Instance, VectorIndexer, StringIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.LeafNode -import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint @@ -294,20 +293,20 @@ class DecisionTreeClassifierSuite extends SparkFunSuite with MLlibTestSparkConte case LabeledPoint(label: Double, features: Vector) => { if (rnd.nextGaussian() > 0.0) { Iterator( - WeightedLabeledPoint(label, 1.2, features), - WeightedLabeledPoint(label, 0.8, features), - WeightedLabeledPoint(0.0, 0.0, features)) + Instance(label, 1.2, features), + Instance(label, 0.8, features), + Instance(0.0, 0.0, features)) } else { Iterator( - WeightedLabeledPoint(label, 0.3, features), - WeightedLabeledPoint(1, 0.0, features), - WeightedLabeledPoint(label, 1.1, features), - WeightedLabeledPoint(label, 0.6, features)) + Instance(label, 0.3, features), + Instance(1, 0.0, features), + Instance(label, 1.1, features), + Instance(label, 0.6, features)) } } } val weightedTestData2 = testData2.map { - p: LabeledPoint => WeightedLabeledPoint(p.label, 1, p.features) + p: LabeledPoint => Instance(p.label, 1, p.features) } (sqlContext.createDataFrame(sc.parallelize(overSampledTestData1 ++ testData2, 2)), diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala index 5a2298823eaa7..e15a5bfd05b70 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala @@ -19,11 +19,10 @@ package org.apache.spark.ml.classification import org.apache.spark.SparkFunSuite import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.feature.{IndexToString, VectorIndexer, StringIndexer} +import org.apache.spark.ml.feature.{Instance, IndexToString, VectorIndexer, StringIndexer} import org.apache.spark.ml.impl.TreeTests import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.LeafNode -import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.regression.LabeledPoint @@ -188,10 +187,10 @@ class RandomForestClassifierSuite extends SparkFunSuite with MLlibTestSparkConte test("training with weighted data") { val (dataset, testDataset) = { val keyFeature = Vectors.dense(0, 1.0, 2, 1.2) - val data0 = Array.fill(20)(WeightedLabeledPoint(0, 0.1, keyFeature)) - val data1 = Array.fill(10)(WeightedLabeledPoint(1, 20.0, keyFeature)) + val data0 = Array.fill(20)(Instance(0, 0.1, keyFeature)) + val data1 = Array.fill(10)(Instance(1, 20.0, keyFeature)) - val testData = Seq(WeightedLabeledPoint(0, 0.1, keyFeature)) + val testData = Seq(Instance(0, 0.1, keyFeature)) (sqlContext.createDataFrame(sc.parallelize(data0 ++ data1, 2)), sqlContext.createDataFrame(sc.parallelize(testData, 2))) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala index 9b6a874b27c9e..3e7ab84ef4282 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/DecisionTreeRegressorSuite.scala @@ -20,9 +20,8 @@ package org.apache.spark.ml.regression import scala.util.Random import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.VectorIndexer +import org.apache.spark.ml.feature.{Instance, VectorIndexer} import org.apache.spark.ml.impl.TreeTests -import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.regression.LabeledPoint @@ -92,20 +91,20 @@ class DecisionTreeRegressorSuite extends SparkFunSuite with MLlibTestSparkContex case LabeledPoint(label: Double, features: Vector) => { if (rnd.nextGaussian() > 0.0) { Iterator( - WeightedLabeledPoint(label, 1.2, features), - WeightedLabeledPoint(label, 0.8, features), - WeightedLabeledPoint(0.0, 0.0, features)) + Instance(label, 1.2, features), + Instance(label, 0.8, features), + Instance(0.0, 0.0, features)) } else { Iterator( - WeightedLabeledPoint(label, 0.3, features), - WeightedLabeledPoint(1.0, 0.0, features), - WeightedLabeledPoint(label, 1.1, features), - WeightedLabeledPoint(label, 0.6, features)) + Instance(label, 0.3, features), + Instance(1.0, 0.0, features), + Instance(label, 1.1, features), + Instance(label, 0.6, features)) } } } val weightedTestData2 = testData2.map { - p: LabeledPoint => WeightedLabeledPoint(p.label, 1, p.features) + p: LabeledPoint => Instance(p.label, 1, p.features) } (sqlContext.createDataFrame(sc.parallelize(overSampledTestData1 ++ testData2, 2)), diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index 079bd3b423f12..3fa03cb1c7b33 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -19,9 +19,8 @@ package org.apache.spark.ml.regression import org.apache.spark.SparkFunSuite import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.feature.VectorIndexer +import org.apache.spark.ml.feature.{Instance, VectorIndexer} import org.apache.spark.ml.impl.TreeTests -import org.apache.spark.ml.tree.impl.WeightedLabeledPoint import org.apache.spark.ml.util.MLTestingUtils import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint @@ -108,10 +107,10 @@ class RandomForestRegressorSuite extends SparkFunSuite with MLlibTestSparkContex test("training with weighted data") { val (dataset, testDataset) = { val keyFeature = Vectors.dense(0, 1.0, 2, 1.2) - val data0 = Array.fill(10)(WeightedLabeledPoint(10, 0.1, keyFeature)) - val data1 = Array.fill(10)(WeightedLabeledPoint(20, 20.0, keyFeature)) + val data0 = Array.fill(10)(Instance(10, 0.1, keyFeature)) + val data1 = Array.fill(10)(Instance(20, 20.0, keyFeature)) - val testData = Seq(WeightedLabeledPoint(0, 1, keyFeature)) + val testData = Seq(Instance(0, 1, keyFeature)) (sqlContext.createDataFrame(sc.parallelize(data0 ++ data1, 2)), sqlContext.createDataFrame(sc.parallelize(testData, 2))) } From 32f4548a22aaf2079dabef0743342d81bef7750f Mon Sep 17 00:00:00 2001 From: Meihua Wu Date: Fri, 6 Nov 2015 18:56:35 -0800 Subject: [PATCH 3/3] fixed rebase conflict. --- .../spark/ml/tree/impl/RandomForest.scala | 41 ------------------- .../org/apache/spark/ml/impl/TreeTests.scala | 9 ++-- 2 files changed, 3 insertions(+), 47 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 7350039dff06c..9ce0205588e66 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -207,47 +207,6 @@ private[ml] object RandomForest extends Logging { } } - /** - * Get the node index corresponding to this data point. - * This function mimics prediction, passing an example from the root node down to a leaf - * or unsplit node; that node's index is returned. - * - * @param node Node in tree from which to classify the given data point. - * @param binnedFeatures Binned feature vector for data point. - * @param splits possible splits for all features, indexed (numFeatures)(numSplits) - * @return Leaf index if the data point reaches a leaf. - * Otherwise, last node reachable in tree matching this example. - * Note: This is the global node index, i.e., the index used in the tree. - * This index is different from the index used during training a particular - * group of nodes on one call to [[findBestSplits()]]. - */ - private def predictNodeIndex( - node: LearningNode, - binnedFeatures: Array[Int], - splits: Array[Array[Split]]): Int = { - if (node.isLeaf || node.split.isEmpty) { - node.id - } else { - val split = node.split.get - val featureIndex = split.featureIndex - val splitLeft = split.shouldGoLeft(binnedFeatures(featureIndex), splits(featureIndex)) - if (node.leftChild.isEmpty) { - // Not yet split. Return index from next layer of nodes to train - if (splitLeft) { - LearningNode.leftChildIndex(node.id) - } else { - LearningNode.rightChildIndex(node.id) - } - } else { - if (splitLeft) { - predictNodeIndex(node.leftChild.get, binnedFeatures, splits) - } else { - predictNodeIndex(node.rightChild.get, binnedFeatures, splits) - } - } - } - } - /** * Helper for binSeqOp, for data which can contain a mix of ordered and unordered features. * diff --git a/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala b/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala index e85fcd87c9329..9c69b6a91d41c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/impl/TreeTests.scala @@ -17,20 +17,17 @@ package org.apache.spark.ml.impl -import org.apache.spark.mllib.linalg.Vectors - import scala.collection.JavaConverters._ +import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.api.java.JavaRDD import org.apache.spark.ml.attribute.{AttributeGroup, NominalAttribute, NumericAttribute} import org.apache.spark.ml.tree._ +import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SQLContext, DataFrame} - -import scala.util.Random - +import org.apache.spark.sql.{DataFrame, SQLContext} private[ml] object TreeTests extends SparkFunSuite {