Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d1e22d5
tuning summary
YY-OnCall Dec 3, 2016
a7cfa63
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Dec 5, 2016
ad73c12
add ut
YY-OnCall Dec 5, 2016
425a419
add comments
YY-OnCall Dec 5, 2016
bd18c00
get default spark session
YY-OnCall Dec 13, 2016
2a0af1d
resolve merge conflict
YY-OnCall Feb 22, 2017
1a594d0
merge conflict
YY-OnCall Jul 5, 2017
0a698fe
support cross validation
YY-OnCall Jul 5, 2017
bd459b1
update version
YY-OnCall Jul 5, 2017
4e3e19c
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Jul 24, 2017
c0bc81a
improve unit test
YY-OnCall Jul 24, 2017
bbf3f9f
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Aug 8, 2017
b6a7c53
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Aug 9, 2017
72aea62
remove TuningSummary
YY-OnCall Aug 9, 2017
91da358
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Sep 10, 2017
297091f
update for pipeline
YY-OnCall Sep 11, 2017
670467a
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Nov 19, 2017
36b1dd5
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Dec 29, 2017
5844e0c
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Jan 1, 2018
8c829b5
merge conflict
YY-OnCall Jul 23, 2018
4aaf8e5
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Jul 23, 2018
ceaad1c
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Jul 27, 2018
41c4c12
Merge remote-tracking branch 'upstream/master' into tuningsummary
YY-OnCall Jul 29, 2018
4aef3aa
remove sort add comments
YY-OnCall Jul 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object ModelSelectionViaCrossValidationExample {
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
cvModel.tuningSummary.show()
// $example off$

spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ object ModelSelectionViaTrainValidationSplitExample {
model.transform(test)
.select("features", "label", "prediction")
.show()
model.tuningSummary.show()
// $example off$

spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ class CrossValidatorModel private[ml] (
bestModel.transformSchema(schema)
}

/**
* Summary of grid search tuning in the format of DataFrame. Each row contains one candidate
* paramMap and the corresponding metric of trained model.
*/
@Since("2.3.0")
lazy val tuningSummary: DataFrame = this.getTuningSummaryDF(avgMetrics)

@Since("1.4.0")
override def copy(extra: ParamMap): CrossValidatorModel = {
val copied = new CrossValidatorModel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,13 @@ class TrainValidationSplitModel private[ml] (
bestModel.transformSchema(schema)
}

/**
* Summary of grid search tuning in the format of DataFrame. Each row contains one candidate
* paramMap and the corresponding metric of trained model.
*/
@Since("2.3.0")
lazy val tuningSummary: DataFrame = this.getTuningSummaryDF(validationMetrics)

@Since("1.5.0")
override def copy(extra: ParamMap): TrainValidationSplitModel = {
val copied = new TrainValidationSplitModel (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import org.json4s.{DefaultFormats, _}
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.{Estimator, Model, Pipeline}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.shared.HasSeed
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}

/**
* Common params for [[TrainValidationSplitParams]] and [[CrossValidatorParams]].
Expand Down Expand Up @@ -85,6 +86,54 @@ private[ml] trait ValidatorParams extends HasSeed with Params {
instrumentation.logNamedValue("evaluator", $(evaluator).getClass.getCanonicalName)
instrumentation.logNamedValue("estimatorParamMapsLength", $(estimatorParamMaps).length)
}


/**
* @return Summary of grid search tuning in the format of DataFrame. Each row contains one
* candidate paramMap and the corresponding metric of trained model.
*/
protected def getTuningSummaryDF(metrics: Array[Double]): DataFrame = {
val paramMaps = $(estimatorParamMaps)
require(paramMaps.nonEmpty, "estimator param maps should not be empty")
require(paramMaps.length == metrics.length, "estimator param maps number should match metrics")
val metricName = $(evaluator) match {
case b: BinaryClassificationEvaluator => b.getMetricName
case m: MulticlassClassificationEvaluator => m.getMetricName
case r: RegressionEvaluator => r.getMetricName
case _ => "metrics"
}
val spark = SparkSession.builder().getOrCreate()
val sc = spark.sparkContext

// collect related params since paramMaps does not necessarily contain the same set of params.
val tuningParamPairs = paramMaps.flatMap(map => map.toSeq)
val tuningParams = tuningParamPairs.map(_.param.asInstanceOf[Param[Any]]).distinct
val schema = new StructType(tuningParams.map(p => StructField(p.toString, StringType))
++ Array(StructField(metricName, DoubleType)))

// get param values in paramMap, as well as the default values if not in paramMap.
val rows = paramMaps.zip(metrics).map { case (pMap, metric) =>
val est = $(estimator).copy(pMap)
val values = tuningParams.map { param =>
est match {
// get param value in stages if est is a Pipeline.
case pipeline: Pipeline =>
val candidates = pipeline.getStages.flatMap { stage =>
stage.extractParamMap().get(param)
}
if(candidates.nonEmpty) {
param.jsonEncode(candidates.head)
} else {
param.jsonEncode(est.getOrDefault(param))
}
case _ =>
param.jsonEncode(est.getOrDefault(param))
}
} ++ Seq(metric)
Row.fromSeq(values)
}
spark.createDataFrame(sc.parallelize(rows), schema)
}
}

private[ml] object ValidatorParams {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import org.apache.spark.ml.{Estimator, Model, Pipeline}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest}
import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.ml.feature.{HashingTF, MinMaxScaler}
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.HasInputCol
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.mllib.util.LinearDataGenerator
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLTest, MLTestingUtils}
import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.StructType

class CrossValidatorSuite
Expand Down Expand Up @@ -75,6 +75,73 @@ class CrossValidatorSuite
}
}

test("cross validation with tuning summary") {
val lr = new LogisticRegression
val lrParamMaps: Array[ParamMap] = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001, 1.0, 1000.0))
.addGrid(lr.maxIter, Array(0, 2))
.build()
val eval = new BinaryClassificationEvaluator
val cv = new CrossValidator()
.setEstimator(lr)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
.setNumFolds(3)
val cvModel = cv.fit(dataset)
val expected = lrParamMaps.zip(cvModel.avgMetrics).map { case (map, metric) =>
Row.fromSeq(map.toSeq.map(_.value.toString) ++ Seq(metric))
}
assert(cvModel.tuningSummary.collect().toSet === expected.toSet)
assert(cvModel.tuningSummary.columns.last === eval.getMetricName)
}

test("CrossValidation tuningSummary with Pipeline") {
val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
val scalar = new MinMaxScaler().setInputCol("features").setOutputCol("scaled")
val lr = new LogisticRegression().setFeaturesCol("scaled")
val pipeline = new Pipeline().setStages(Array(scalar, lr))

val lrParamMaps = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001, 1.0))
.addGrid(lr.maxIter, Array(0, 2))
.addGrid(scalar.min, Array(0.0, -1.0))
.build()
val eval = new BinaryClassificationEvaluator
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
.setNumFolds(3)
val cvModel = cv.fit(dataset)
val expected = lrParamMaps.zip(cvModel.avgMetrics).map { case (map, metric) =>
Row.fromSeq(map.toSeq.map(_.value.toString) ++ Seq(metric))
}
assert(cvModel.tuningSummary.collect().toSet === expected.toSet)
assert(cvModel.tuningSummary.columns.last === eval.getMetricName)
cvModel.tuningSummary.show()
}

test("CV tuningSummary with non-overlapping params") {
val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
val lr = new LogisticRegression
val lrParamMaps = Array(new ParamMap().put(lr.maxIter, 0),
new ParamMap().put(lr.regParam, 0.01))
val eval = new BinaryClassificationEvaluator
val cv = new CrossValidator()
.setEstimator(lr)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
.setNumFolds(3)
val cvModel = cv.fit(dataset)
val expected = Seq((0, 0.0, cvModel.avgMetrics(0)),
(lr.getOrDefault(lr.maxIter), 0.01, cvModel.avgMetrics(1))
).map(t => (t._1.toString, t._2.toString, t._3))
.toDF(lr.maxIter.name, lr.regParam.name, eval.getMetricName)

assert(cvModel.tuningSummary.collect().toSet === expected.collect().toSet)
assert(cvModel.tuningSummary.columns.last === eval.getMetricName)
}

test("cross validation with linear regression") {
val dataset = sc.parallelize(
LinearDataGenerator.generateLinearInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ package org.apache.spark.ml.tuning
import java.io.File

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.{Estimator, Model, Pipeline}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, OneVsRest}
import org.apache.spark.ml.classification.LogisticRegressionSuite.generateLogisticInput
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, Evaluator, RegressionEvaluator}
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared.HasInputCol
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.mllib.util.LinearDataGenerator
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.util.{DefaultReadWriteTest, Identifiable, MLTest, MLTestingUtils}
import org.apache.spark.mllib.util.{LinearDataGenerator, MLlibTestSparkContext}
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.types.StructType

class TrainValidationSplitSuite
Expand Down Expand Up @@ -73,6 +74,68 @@ class TrainValidationSplitSuite
}
}

test("train validation split tuningSummary") {
val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
val lr = new LogisticRegression
val lrParamMaps = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001, 1.0, 1000.0))
.addGrid(lr.maxIter, Array(0, 2))
.build()
val eval = new BinaryClassificationEvaluator
val tvs = new TrainValidationSplit()
.setEstimator(lr)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
val tvsModel = tvs.fit(dataset)
val expected = lrParamMaps.zip(tvsModel.validationMetrics).map { case (map, metric) =>
Row.fromSeq(map.toSeq.map(_.value.toString) ++ Seq(metric))
}
assert(tvsModel.tuningSummary.collect().toSet === expected.toSet)
assert(tvsModel.tuningSummary.columns.last === eval.getMetricName)
}

test("tuningSummary with non-overlapping params") {
val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
val lr = new LogisticRegression
val lrParamMaps = Array(new ParamMap().put(lr.maxIter, 0),
new ParamMap().put(lr.regParam, 0.01))
val eval = new BinaryClassificationEvaluator
val tvs = new TrainValidationSplit()
.setEstimator(lr)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
val tvsModel = tvs.fit(dataset)
val expected = Seq((0, lr.getOrDefault(lr.regParam), tvsModel.validationMetrics(0)),
(lr.getOrDefault(lr.maxIter), 0.01, tvsModel.validationMetrics(1))
).map(t => (t._1.toString, t._2.toString, t._3))
.toDF(lr.maxIter.name, lr.regParam.name, eval.getMetricName)

assert(tvsModel.tuningSummary.collect().toSet === expected.collect().toSet)
assert(tvsModel.tuningSummary.columns.last === eval.getMetricName)
}

test("train validation split tuningSummary with Pipeline") {
val dataset = sc.parallelize(generateLogisticInput(1.0, 1.0, 100, 42), 2).toDF()
val scalar = new MinMaxScaler().setInputCol("features").setOutputCol("scaled")
val lr = new LogisticRegression().setFeaturesCol("scaled")
val pipeline = new Pipeline().setStages(Array(scalar, lr))

val lrParamMaps = Array(new ParamMap().put(lr.maxIter, 0),
new ParamMap().put(scalar.min, -1.0))
val eval = new BinaryClassificationEvaluator
val tvs = new TrainValidationSplit()
.setEstimator(pipeline)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
val tvsModel = tvs.fit(dataset)
val expected = Seq((0, scalar.getOrDefault(scalar.min), tvsModel.validationMetrics(0)),
(lr.getOrDefault(lr.maxIter), -1.0, tvsModel.validationMetrics(1))
).map(t => (t._1.toString, t._2.toString, t._3))
.toDF(lr.maxIter.name, scalar.min.name, eval.getMetricName)
assert(tvsModel.tuningSummary.collect().toSet === expected.collect().toSet)
assert(tvsModel.tuningSummary.columns.last === eval.getMetricName)
}

test("train validation with linear regression") {
val dataset = sc.parallelize(
LinearDataGenerator.generateLinearInput(
Expand Down Expand Up @@ -100,7 +163,7 @@ class TrainValidationSplitSuite
assert(parent.getMaxIter === 10)
assert(tvsModel.validationMetrics.length === lrParamMaps.length)

eval.setMetricName("r2")
eval.setMetricName("r2")
val tvsModel2 = tvs.fit(dataset)
val parent2 = tvsModel2.bestModel.parent.asInstanceOf[LinearRegression]
assert(parent2.getRegParam === 0.001)
Expand Down