diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index a25e625a4017..3b08040e270c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -17,11 +17,17 @@ package org.apache.spark.mllib.tree.model +import scala.collection.mutable + +import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.tree.configuration.{Algo, FeatureType} import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row, SQLContext} /** * :: Experimental :: @@ -31,7 +37,7 @@ import org.apache.spark.rdd.RDD * @param algo algorithm type -- classification or regression */ @Experimental -class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable { +class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable with Saveable { /** * Predict values for a single data point using the model trained. @@ -98,4 +104,276 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable header + topNode.subtreeToString(2) } + override def save(sc: SparkContext, path: String): Unit = { + DecisionTreeModel.SaveLoadV1_0.save(sc, path, this) + } + + override protected def formatVersion: String = "1.0" +} + +object DecisionTreeModel extends Loader[DecisionTreeModel] { + + /** + * Iterator which does a DFS traversal (left to right) of a decision tree. + * + * Note: This is private[ml] to permit unit tests. + */ + private[mllib] class NodeIterator(model: DecisionTreeModel) extends Iterator[Node] { + + /** + * FILO stack of Nodes during our DFS. + * The top Node is returned by next(). + * Any Node on the queue is either a leaf or has children whom we have not yet visited. + * This is empty once all Nodes have been traversed. + */ + val nodeTrace: mutable.Stack[Node] = new mutable.Stack[Node]() + + nodeTrace.push(model.topNode) + + override def hasNext: Boolean = nodeTrace.nonEmpty + + /** + * Produces the next element of this iterator. + * If [[hasNext]] is false, then this throws an exception. + */ + override def next(): Node = { + if (nodeTrace.isEmpty) { + throw new Exception( + "DecisionTreeModel.NodeIterator.next() was called, but no more elements remain.") + } + val n = nodeTrace.pop() + if (!n.isLeaf) { + // n is a parent + nodeTrace.push(n.rightNode.get, n.leftNode.get) + } + n + } + } + + private[tree] object SaveLoadV1_0 { + + def thisFormatVersion = "1.0" + + // Hard-code class name string in case it changes in the future + def thisClassName = "org.apache.spark.mllib.tree.DecisionTreeModel" + + case class PredictData(predict: Double, prob: Double) + + object PredictData { + def apply(p: Predict): PredictData = PredictData(p.predict, p.prob) + } + + case class InformationGainStatsData( + gain: Double, + impurity: Double, + leftImpurity: Double, + rightImpurity: Double, + leftPredict: PredictData, + rightPredict: PredictData) + + object InformationGainStatsData { + def apply(i: InformationGainStats): InformationGainStatsData = { + InformationGainStatsData(i.gain, i.impurity, i.leftImpurity, i.rightImpurity, + PredictData(i.leftPredict), PredictData(i.rightPredict)) + } + } + + case class SplitData( + feature: Int, + threshold: Double, + featureType: Int, + categories: Seq[Double]) // TODO: Change to List once SPARK-3365 is fixed + + object SplitData { + def apply(s: Split): SplitData = { + SplitData(s.feature, s.threshold, s.featureType.id, s.categories) + } + } + + /** Model data for model import/export */ + case class NodeData( + id: Int, + predict: PredictData, + impurity: Double, + isLeaf: Boolean, + split: Option[SplitData], + leftNodeId: Option[Int], + rightNodeId: Option[Int], + stats: Option[InformationGainStatsData]) + + object NodeData { + def apply(n: Node): NodeData = { + NodeData(n.id, PredictData(n.predict), n.impurity, n.isLeaf, n.split.map(SplitData.apply), + n.leftNode.map(_.id), n.rightNode.map(_.id), n.stats.map(InformationGainStatsData.apply)) + } + } + + def save(sc: SparkContext, path: String, model: DecisionTreeModel): Unit = { + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + // Create JSON metadata. + val metadataRDD = sc.parallelize(Seq((thisClassName, thisFormatVersion, model.algo.toString, + model.numNodes)), 1) + .toDataFrame("class", "version", "algo", "numNodes") + metadataRDD.toJSON.saveAsTextFile(Loader.metadataPath(path)) + + // Create Parquet data. + val nodeIterator = new DecisionTreeModel.NodeIterator(model) + val dataRDD: DataFrame = sc.parallelize(nodeIterator.toSeq).map(NodeData.apply).toDataFrame + dataRDD.saveAsParquetFile(Loader.dataPath(path)) + } + + /** + * Node with its child IDs. This class is used for loading data and constructing a tree. + * The child IDs are relevant iff Node.isLeaf == false. + */ + case class NodeWithKids(node: Node, leftChildId: Int, rightChildId: Int) + + def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = { + val datapath = Loader.dataPath(path) + val sqlContext = new SQLContext(sc) + // Load Parquet data. + val dataRDD = sqlContext.parquetFile(datapath) + // Check schema explicitly since erasure makes it hard to use match-case for checking. + Loader.checkSchema[NodeData](dataRDD.schema) + val nodesRDD: RDD[NodeWithKids] = readNodes(dataRDD) + // Collect tree nodes, and build them into a tree. + val tree = constructTree(nodesRDD.collect(), algo, datapath) + assert(tree.numNodes == numNodes, + s"Unable to load DecisionTreeModel data from: $datapath." + + s" Expected $numNodes nodes but found ${tree.numNodes}") + tree + } + + /** + * Read nodes from the loaded data, and return each node with its child IDs. + * NOTE: The caller should check the schema. + */ + def readNodes(data: DataFrame): RDD[NodeWithKids] = { + val splitsRDD: RDD[Option[Split]] = + data.select("split.feature", "split.threshold", "split.featureType", "split.categories") + .map { row: Row => + if (row.isNullAt(0)) { + None + } else { + row match { + case Row(feature: Int, threshold: Double, featureType: Int, categories: Seq[_]) => + // Note: The type cast for categories is safe since we checked the schema. + Some(Split(feature, threshold, FeatureType(featureType), + categories.asInstanceOf[Seq[Double]].toList)) + } + } + } + val lrChildNodesRDD: RDD[Option[(Int, Int)]] = + data.select("leftNodeId", "rightNodeId").map { row: Row => + if (row.isNullAt(0)) { + None + } else { + row match { + case Row(leftNodeId: Int, rightNodeId: Int) => + Some((leftNodeId, rightNodeId)) + } + } + } + val gainStatsRDD: RDD[Option[InformationGainStats]] = data.select( + "stats.gain", "stats.impurity", "stats.leftImpurity", "stats.rightImpurity", + "stats.leftPredict.predict", "stats.leftPredict.prob", + "stats.rightPredict.predict", "stats.rightPredict.prob").map { row: Row => + if (row.isNullAt(0)) { + None + } else { + row match { + case Row(gain: Double, impurity: Double, leftImpurity: Double, rightImpurity: Double, + leftPredictPredict: Double, leftPredictProb: Double, + rightPredictPredict: Double, rightPredictProb: Double) => + Some(new InformationGainStats(gain, impurity, leftImpurity, rightImpurity, + new Predict(leftPredictPredict, leftPredictProb), + new Predict(rightPredictPredict, rightPredictProb))) + } + } + } + // nodesRDD stores (Node, leftChildId, rightChildId) where the child ids are only relevant if + // Node.isLeaf == false + data.select("id", "predict.predict", "predict.prob", "impurity", "isLeaf").rdd + .zip(splitsRDD).zip(lrChildNodesRDD).zip(gainStatsRDD).map { + case (((Row(id: Int, predictPredict: Double, predictProb: Double, + impurity: Double, isLeaf: Boolean), + split: Option[Split]), lrChildNodes: Option[(Int, Int)]), + gainStats: Option[InformationGainStats]) => + val (leftChildId, rightChildId) = lrChildNodes.getOrElse((-1, -1)) + NodeWithKids(new Node(id, new Predict(predictPredict, predictProb), impurity, isLeaf, + split, None, None, gainStats), + leftChildId, rightChildId) + } + } + + /** + * Given a list of nodes from a tree, construct the tree. + * @param nodes Array of all nodes in a tree. + * @param algo Algorithm tree is for. + * @param datapath Used for printing debugging messages if an error occurs. + */ + def constructTree( + nodes: Iterable[NodeWithKids], + algo: String, + datapath: String): DecisionTreeModel = { + // nodesMap: node id -> (node, leftChild, rightChild) + val nodesMap: Map[Int, NodeWithKids] = nodes.map(n => n.node.id -> n).toMap + assert(nodesMap.contains(1), + s"DecisionTree missing root node (id = 1) after loading from: $datapath") + val topNode = nodesMap(1) + linkSubtree(topNode, nodesMap) + new DecisionTreeModel(topNode.node, Algo.fromString(algo)) + } + + /** + * Link the given node to its children (if any), and recurse down the subtree. + * @param nodeWithKids Node to link + * @param nodesMap Map storing all nodes as a map: node id -> (Node, leftChildId, rightChildId) + */ + private def linkSubtree( + nodeWithKids: NodeWithKids, + nodesMap: Map[Int, NodeWithKids]): Unit = { + val (node, leftChildId, rightChildId) = + (nodeWithKids.node, nodeWithKids.leftChildId, nodeWithKids.rightChildId) + if (node.isLeaf) return + assert(nodesMap.contains(leftChildId), + s"DecisionTreeModel.load could not find child (id=$leftChildId) of node ${node.id}.") + assert(nodesMap.contains(rightChildId), + s"DecisionTreeModel.load could not find child (id=$rightChildId) of node ${node.id}.") + val leftChild = nodesMap(leftChildId) + val rightChild = nodesMap(rightChildId) + node.leftNode = Some(leftChild.node) + node.rightNode = Some(rightChild.node) + linkSubtree(leftChild, nodesMap) + linkSubtree(rightChild, nodesMap) + } + } + + override def load(sc: SparkContext, path: String): DecisionTreeModel = { + val (loadedClassName, version, metadata) = Loader.loadMetadata(sc, path) + val (algo: String, numNodes: Int) = try { + val algo_numNodes = metadata.select("algo", "numNodes").collect() + assert(algo_numNodes.length == 1) + algo_numNodes(0) match { + case Row(a: String, n: Int) => (a, n) + } + } catch { + // Catch both Error and Exception since the checks above can throw either. + case e: Throwable => + throw new Exception( + s"Unable to load DecisionTreeModel metadata from: ${Loader.metadataPath(path)}." + + s" Error message: ${e.getMessage}") + } + val classNameV1_0 = SaveLoadV1_0.thisClassName + (loadedClassName, version) match { + case (className, "1.0") if className == classNameV1_0 => + SaveLoadV1_0.load(sc, path, algo, numNodes) + case _ => throw new Exception( + s"DecisionTreeModel.load did not recognize model with (className, format version):" + + s"($loadedClassName, $version). Supported:\n" + + s" ($classNameV1_0, 1.0)") + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala index 9a50ecb550c3..80990aa9a603 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala @@ -49,7 +49,9 @@ class InformationGainStats( gain == other.gain && impurity == other.impurity && leftImpurity == other.leftImpurity && - rightImpurity == other.rightImpurity + rightImpurity == other.rightImpurity && + leftPredict == other.leftPredict && + rightPredict == other.rightPredict } case _ => false } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Predict.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Predict.scala index 004838ee5ba0..ad4c0dbbfb3e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Predict.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Predict.scala @@ -32,4 +32,11 @@ class Predict( override def toString = { "predict = %f, prob = %f".format(predict, prob) } + + override def equals(other: Any): Boolean = { + other match { + case p: Predict => predict == p.predict && prob == p.prob + case _ => false + } + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 22997110de8d..4c8e219cda38 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -21,12 +21,17 @@ import scala.collection.mutable import com.github.fommil.netlib.BLAS.{getInstance => blas} +import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.tree.configuration.Algo._ +import org.apache.spark.mllib.tree.configuration.Algo import org.apache.spark.mllib.tree.configuration.EnsembleCombiningStrategy._ +import org.apache.spark.mllib.util.{Saveable, Loader} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, DataFrame, SQLContext} + /** * :: Experimental :: @@ -38,9 +43,42 @@ import org.apache.spark.rdd.RDD @Experimental class RandomForestModel(override val algo: Algo, override val trees: Array[DecisionTreeModel]) extends TreeEnsembleModel(algo, trees, Array.fill(trees.size)(1.0), - combiningStrategy = if (algo == Classification) Vote else Average) { + combiningStrategy = if (algo == Classification) Vote else Average) + with Saveable { require(trees.forall(_.algo == algo)) + + override def save(sc: SparkContext, path: String): Unit = { + TreeEnsembleModel.SaveLoadV1_0.save(sc, path, this, + RandomForestModel.SaveLoadV1_0.thisClassName) + } + + override protected def formatVersion: String = TreeEnsembleModel.SaveLoadV1_0.thisFormatVersion +} + +object RandomForestModel extends Loader[RandomForestModel] { + + override def load(sc: SparkContext, path: String): RandomForestModel = { + val (loadedClassName, version, metadataRDD) = Loader.loadMetadata(sc, path) + val classNameV1_0 = SaveLoadV1_0.thisClassName + (loadedClassName, version) match { + case (className, "1.0") if className == classNameV1_0 => + val metadata = TreeEnsembleModel.SaveLoadV1_0.readMetadata(metadataRDD, path) + assert(metadata.treeWeights.forall(_ == 1.0)) + val trees = + TreeEnsembleModel.SaveLoadV1_0.loadTrees(sc, path, classNameV1_0, metadata.treeAlgo) + new RandomForestModel(Algo.fromString(metadata.algo), trees) + case _ => throw new Exception(s"RandomForestModel.load did not recognize model" + + s" with (className, format version): ($loadedClassName, $version). Supported:\n" + + s" ($classNameV1_0, 1.0)") + } + } + + private object SaveLoadV1_0 { + // Hard-code class name string in case it changes in the future + def thisClassName = "org.apache.spark.mllib.tree.model.RandomForestModel" + } + } /** @@ -56,9 +94,42 @@ class GradientBoostedTreesModel( override val algo: Algo, override val trees: Array[DecisionTreeModel], override val treeWeights: Array[Double]) - extends TreeEnsembleModel(algo, trees, treeWeights, combiningStrategy = Sum) { + extends TreeEnsembleModel(algo, trees, treeWeights, combiningStrategy = Sum) + with Saveable { require(trees.size == treeWeights.size) + + override def save(sc: SparkContext, path: String): Unit = { + TreeEnsembleModel.SaveLoadV1_0.save(sc, path, this, + GradientBoostedTreesModel.SaveLoadV1_0.thisClassName) + } + + override protected def formatVersion: String = TreeEnsembleModel.SaveLoadV1_0.thisFormatVersion +} + +object GradientBoostedTreesModel extends Loader[GradientBoostedTreesModel] { + + override def load(sc: SparkContext, path: String): GradientBoostedTreesModel = { + val (loadedClassName, version, metadataRDD) = Loader.loadMetadata(sc, path) + val classNameV1_0 = SaveLoadV1_0.thisClassName + (loadedClassName, version) match { + case (className, "1.0") if className == classNameV1_0 => + val metadata = TreeEnsembleModel.SaveLoadV1_0.readMetadata(metadataRDD, path) + assert(metadata.combiningStrategy == Sum.toString) + val trees = + TreeEnsembleModel.SaveLoadV1_0.loadTrees(sc, path, classNameV1_0, metadata.treeAlgo) + new GradientBoostedTreesModel(Algo.fromString(metadata.algo), trees, metadata.treeWeights) + case _ => throw new Exception(s"GradientBoostedTreesModel.load did not recognize model" + + s" with (className, format version): ($loadedClassName, $version). Supported:\n" + + s" ($classNameV1_0, 1.0)") + } + } + + private object SaveLoadV1_0 { + // Hard-code class name string in case it changes in the future + def thisClassName = "org.apache.spark.mllib.tree.model.GradientBoostedTreesModel" + } + } /** @@ -176,3 +247,112 @@ private[tree] sealed class TreeEnsembleModel( */ def totalNumNodes: Int = trees.map(_.numNodes).sum } + +private[tree] object TreeEnsembleModel { + + object SaveLoadV1_0 { + + import DecisionTreeModel.SaveLoadV1_0.{InformationGainStatsData, PredictData, SplitData} + + def thisFormatVersion = "1.0" + + case class Metadata( + algo: String, + treeAlgo: String, + combiningStrategy: String, + treeWeights: Array[Double]) + + /** + * Model data for model import/export. + * We have to duplicate NodeData here since Spark SQL does not yet support extracting subfields + * of nested fields; once that is possible, we can use something like: + * case class EnsembleNodeData(treeId: Int, node: NodeData), + * where NodeData is from DecisionTreeModel. + */ + case class NodeData( + treeId: Int, + id: Int, + predict: PredictData, + impurity: Double, + isLeaf: Boolean, + split: Option[SplitData], + leftNodeId: Option[Int], + rightNodeId: Option[Int], + stats: Option[InformationGainStatsData]) + + object NodeData { + def apply(treeId: Int, n: Node): NodeData = { + NodeData(treeId, + n.id, PredictData(n.predict), n.impurity, n.isLeaf, n.split.map(SplitData.apply), + n.leftNode.map(_.id), n.rightNode.map(_.id), n.stats.map(InformationGainStatsData.apply)) + } + } + + def save(sc: SparkContext, path: String, model: TreeEnsembleModel, className: String): Unit = { + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + // Create JSON metadata. + val metadata = Metadata(model.algo.toString, model.trees(0).algo.toString, + model.combiningStrategy.toString, model.treeWeights) + val metadataRDD = sc.parallelize(Seq((className, thisFormatVersion, metadata)), 1) + .toDataFrame("class", "version", "metadata") + metadataRDD.toJSON.saveAsTextFile(Loader.metadataPath(path)) + + // Create Parquet data. + val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) => + val nodeIterator = new DecisionTreeModel.NodeIterator(tree) + nodeIterator.toSeq.map(node => NodeData(treeId, node)) + }.toDataFrame + dataRDD.saveAsParquetFile(Loader.dataPath(path)) + } + + /** + * Read metadata from the loaded metadata DataFrame. + * @param path Path for loading data, used for debug messages. + */ + def readMetadata(metadata: DataFrame, path: String): Metadata = { + try { + // We rely on the try-catch for schema checking rather than creating a schema just for this. + val metadataArray = metadata.select("metadata.algo", "metadata.treeAlgo", + "metadata.combiningStrategy", "metadata.treeWeights").collect() + assert(metadataArray.size == 1) + Metadata(metadataArray(0).getString(0), metadataArray(0).getString(1), + metadataArray(0).getString(2), metadataArray(0).getAs[Seq[Double]](3).toArray) + } catch { + // Catch both Error and Exception since the checks above can throw either. + case e: Throwable => + throw new Exception( + s"Unable to load TreeEnsembleModel metadata from: ${Loader.metadataPath(path)}." + + s" Error message: ${e.getMessage}") + } + } + + /** + * Load trees for an ensemble, and return them in order. + * @param className Class name of the tree ensemble. + * @param treeAlgo Algorithm for individual trees (which may differ from the ensemble's + * algorithm). + */ + def loadTrees( + sc: SparkContext, + path: String, + className: String, + treeAlgo: String): Array[DecisionTreeModel] = { + val datapath = Loader.dataPath(path) + val sqlContext = new SQLContext(sc) + val dataRDD = sqlContext.parquetFile(datapath) + val nodesRDD = DecisionTreeModel.SaveLoadV1_0.readNodes(dataRDD) + val treeIdRDD = dataRDD.select("treeId").map { case Row(treeId: Int) => treeId } + val trees: RDD[(Int, DecisionTreeModel)] = treeIdRDD.zip(nodesRDD).groupBy(_._1).map { + case (treeId, treeId_nodes) => + val tree = + DecisionTreeModel.SaveLoadV1_0.constructTree(treeId_nodes.map(_._2), treeAlgo, datapath) + (treeId, tree) + } + trees.collect().sortBy(_._1).map(_._2) + } + + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala index 9347eaf9221a..191a8220d277 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala @@ -26,11 +26,13 @@ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.configuration.FeatureType._ -import org.apache.spark.mllib.tree.configuration.{QuantileStrategy, Strategy} +import org.apache.spark.mllib.tree.configuration.{FeatureType, QuantileStrategy, Strategy} import org.apache.spark.mllib.tree.impl.{BaggedPoint, DecisionTreeMetadata, TreePoint} import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Variance} -import org.apache.spark.mllib.tree.model.{InformationGainStats, DecisionTreeModel, Node} +import org.apache.spark.mllib.tree.model._ import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.util.Utils + class DecisionTreeSuite extends FunSuite with MLlibTestSparkContext { @@ -857,6 +859,31 @@ class DecisionTreeSuite extends FunSuite with MLlibTestSparkContext { assert(topNode.leftNode.get.impurity === 0.0) assert(topNode.rightNode.get.impurity === 0.0) } + + test("NodeIterator") { + val model = DecisionTreeSuite.createModel(Classification) + val iterator = new DecisionTreeModel.NodeIterator(model) + val nodeIds = iterator.map(_.id).toArray.sorted + assert(nodeIds === DecisionTreeSuite.createdModelNodeIds) + } + + test("model save/load") { + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + Array(Classification, Regression).foreach { algo => + val model = DecisionTreeSuite.createModel(algo) + + // Save model, load it back, and compare. + try { + model.save(sc, path) + val sameModel = DecisionTreeModel.load(sc, path) + DecisionTreeSuite.checkEqual(model, sameModel) + } finally { + Utils.deleteRecursively(tempDir) + } + } + } } object DecisionTreeSuite { @@ -979,4 +1006,82 @@ object DecisionTreeSuite { arr } + /** Create a leaf node with the given node ID */ + private def createLeafNode(id: Int): Node = { + Node(nodeIndex = id, new Predict(0.0, 1.0), impurity = 0.5, isLeaf = true) + } + + /** + * Create an internal node with the given node ID and feature type. + * Note: This does NOT set the child nodes. + */ + private def createInternalNode(id: Int, featureType: FeatureType): Node = { + val node = Node(nodeIndex = id, new Predict(0.0, 1.0), impurity = 0.5, isLeaf = false) + featureType match { + case Continuous => + node.split = Some(new Split(feature = 0, threshold = 0.5, Continuous, + categories = List.empty[Double])) + case Categorical => + node.split = Some(new Split(feature = 1, threshold = 0.0, Categorical, + categories = List(0.0, 1.0))) + } + node.stats = Some(new InformationGainStats(gain = 0.1, impurity = 0.2, + leftImpurity = 0.3, rightImpurity = 0.4, new Predict(1.0, 0.4), new Predict(0.0, 0.6))) + node + } + + /** + * Create a tree model. This is deterministic and contains a variety of node and feature types. + */ + private[tree] def createModel(algo: Algo): DecisionTreeModel = { + val topNode = createInternalNode(id = 1, Continuous) + val (node2, node3) = (createLeafNode(id = 2), createInternalNode(id = 3, Categorical)) + val (node6, node7) = (createLeafNode(id = 6), createLeafNode(id = 7)) + topNode.leftNode = Some(node2) + topNode.rightNode = Some(node3) + node3.leftNode = Some(node6) + node3.rightNode = Some(node7) + new DecisionTreeModel(topNode, algo) + } + + /** Sorted Node IDs matching the model returned by [[createModel()]] */ + private val createdModelNodeIds = Array(1, 2, 3, 6, 7) + + /** + * Check if the two trees are exactly the same. + * Note: I hesitate to override Node.equals since it could cause problems if users + * make mistakes such as creating loops of Nodes. + * If the trees are not equal, this prints the two trees and throws an exception. + */ + private[tree] def checkEqual(a: DecisionTreeModel, b: DecisionTreeModel): Unit = { + assert(a.algo == b.algo) + if (a.algo != b.algo || !isEqual(a.topNode, b.topNode)) { + throw new Exception("checkEqual failed since the two trees were not identical.\n" + + "TREE A:\n" + a.toDebugString + "\n" + + "TREE B:\n" + b.toDebugString + "\n") + } + } + + /** + * Return true iff the two nodes and their descendents are exactly the same. + * Note: I hesitate to override Node.equals since it could cause problems if users + * make mistakes such as creating loops of Nodes. + */ + private def isEqual(a: Node, b: Node): Boolean = { + if (a.id != b.id || a.predict != b.predict || a.impurity != b.impurity || a.isLeaf != b.isLeaf + || a.split != b.split || a.stats != b.stats) { + return false + } + (a.leftNode, b.leftNode) match { + case (None, None) => true + case (Some(aNode: Node), Some(bNode: Node)) => isEqual(aNode, bNode) + case (_, _) => false + } + (a.rightNode, b.rightNode) match { + case (None, None) => true + case (Some(aNode: Node), Some(bNode: Node)) => isEqual(aNode, bNode) + case (_, _) => false + } + } + } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala index e8341a5d0d10..bde47606eb00 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/GradientBoostedTreesSuite.scala @@ -24,8 +24,10 @@ import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.impurity.Variance import org.apache.spark.mllib.tree.loss.{AbsoluteError, SquaredError, LogLoss} - +import org.apache.spark.mllib.tree.model.GradientBoostedTreesModel import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.util.Utils + /** * Test suite for [[GradientBoostedTrees]]. @@ -35,32 +37,30 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { test("Regression with continuous features: SquaredError") { GradientBoostedTreesSuite.testCombinations.foreach { case (numIterations, learningRate, subsamplingRate) => - GradientBoostedTreesSuite.randomSeeds.foreach { randomSeed => - val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2) - - val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2, - categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate) - val boostingStrategy = - new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate) - - val gbt = GradientBoostedTrees.train(rdd, boostingStrategy) - - assert(gbt.trees.size === numIterations) - try { - EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.06) - } catch { - case e: java.lang.AssertionError => - println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + - s" subsamplingRate=$subsamplingRate") - throw e - } - - val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) - val dt = DecisionTree.train(remappedInput, treeStrategy) - - // Make sure trees are the same. - assert(gbt.trees.head.toString == dt.toString) + val rdd = sc.parallelize(GradientBoostedTreesSuite.data, 2) + + val treeStrategy = new Strategy(algo = Regression, impurity = Variance, maxDepth = 2, + categoricalFeaturesInfo = Map.empty, subsamplingRate = subsamplingRate) + val boostingStrategy = + new BoostingStrategy(treeStrategy, SquaredError, numIterations, learningRate) + + val gbt = GradientBoostedTrees.train(rdd, boostingStrategy) + + assert(gbt.trees.size === numIterations) + try { + EnsembleTestHelper.validateRegressor(gbt, GradientBoostedTreesSuite.data, 0.06) + } catch { + case e: java.lang.AssertionError => + println(s"FAILED for numIterations=$numIterations, learningRate=$learningRate," + + s" subsamplingRate=$subsamplingRate") + throw e } + + val remappedInput = rdd.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) + val dt = DecisionTree.train(remappedInput, treeStrategy) + + // Make sure trees are the same. + assert(gbt.trees.head.toString == dt.toString) } } @@ -133,14 +133,37 @@ class GradientBoostedTreesSuite extends FunSuite with MLlibTestSparkContext { BoostingStrategy.defaultParams(algo) } } + + test("model save/load") { + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + val trees = Range(0, 3).map(_ => DecisionTreeSuite.createModel(Regression)).toArray + val treeWeights = Array(0.1, 0.3, 1.1) + + Array(Classification, Regression).foreach { algo => + val model = new GradientBoostedTreesModel(algo, trees, treeWeights) + + // Save model, load it back, and compare. + try { + model.save(sc, path) + val sameModel = GradientBoostedTreesModel.load(sc, path) + assert(model.algo == sameModel.algo) + model.trees.zip(sameModel.trees).foreach { case (treeA, treeB) => + DecisionTreeSuite.checkEqual(treeA, treeB) + } + assert(model.treeWeights === sameModel.treeWeights) + } finally { + Utils.deleteRecursively(tempDir) + } + } + } } -object GradientBoostedTreesSuite { +private object GradientBoostedTreesSuite { // Combinations for estimators, learning rates and subsamplingRate val testCombinations = Array((10, 1.0, 1.0), (10, 0.1, 1.0), (10, 0.5, 0.75), (10, 0.1, 0.75)) - val randomSeeds = Array(681283, 4398) - val data = EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 10, 100) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala index 55e963977b54..ee3bc9848686 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/RandomForestSuite.scala @@ -27,8 +27,10 @@ import org.apache.spark.mllib.tree.configuration.Algo._ import org.apache.spark.mllib.tree.configuration.Strategy import org.apache.spark.mllib.tree.impl.DecisionTreeMetadata import org.apache.spark.mllib.tree.impurity.{Gini, Variance} -import org.apache.spark.mllib.tree.model.Node +import org.apache.spark.mllib.tree.model.{Node, RandomForestModel} import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.util.Utils + /** * Test suite for [[RandomForest]]. @@ -212,6 +214,26 @@ class RandomForestSuite extends FunSuite with MLlibTestSparkContext { assert(rf1.toDebugString != rf2.toDebugString) } -} - + test("model save/load") { + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + + Array(Classification, Regression).foreach { algo => + val trees = Range(0, 3).map(_ => DecisionTreeSuite.createModel(algo)).toArray + val model = new RandomForestModel(algo, trees) + + // Save model, load it back, and compare. + try { + model.save(sc, path) + val sameModel = RandomForestModel.load(sc, path) + assert(model.algo == sameModel.algo) + model.trees.zip(sameModel.trees).foreach { case (treeA, treeB) => + DecisionTreeSuite.checkEqual(treeA, treeB) + } + } finally { + Utils.deleteRecursively(tempDir) + } + } + } +}