From 3bf2718c1a1e68273508e63499bb5d1cc8230155 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 24 Jan 2017 15:46:16 -0800 Subject: [PATCH 01/22] add trait offset --- .../spark/ml/param/shared/sharedParams.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index e3e03dfd43dd..0c3952be1dbd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -373,6 +373,21 @@ private[ml] trait HasWeightCol extends Params { final def getWeightCol: String = $(weightCol) } +/** + * Trait for shared param offsetCol. + */ +private[ml] trait HasOffsetCol extends Params { + + /** + * Param for offset column name. If this is not set or empty, we treat all instance offsets as 0.0. + * @group param + */ + final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "offset column name. If this is not set or empty, we treat all instance offsets as 0.0") + + /** @group getParam */ + final def getOffsetCol: String = $(offsetCol) +} + /** * Trait for shared param solver (default: "auto"). */ From 0e240eb313aa91cb645fb3ab8d70e51b6c65b3c7 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 24 Jan 2017 15:48:03 -0800 Subject: [PATCH 02/22] add offset setter --- .../ml/regression/GeneralizedLinearRegression.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 3ffed39898d8..1f5a4f07c426 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} */ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParams with HasFitIntercept with HasMaxIter with HasTol with HasRegParam with HasWeightCol - with HasSolver with Logging { + with HasSolver with HasOffsetCol with Logging { import GeneralizedLinearRegression._ @@ -223,6 +223,16 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val @Since("2.0.0") def setWeightCol(value: String): this.type = set(weightCol, value) + /** + * Sets the value of param [[offsetCol]]. + * If this is not set or empty, we treat all instance offsets as 0.0. + * Default is not set, so all instances have offset 0.0. + * + * @group setParam + */ + @Since("2.2.0") + def setOffsetCol(value: String): this.type = set(offsetCol, value) + /** * Sets the solver algorithm used for optimization. * Currently only supports "irls" which is also the default solver. From 9c41453a19c0f9c31403fafaf1995c642c37c70d Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 24 Jan 2017 21:15:50 -0800 Subject: [PATCH 03/22] implement offset in GLM --- .../IterativelyReweightedLeastSquares.scala | 14 ++-- .../GeneralizedLinearRegression.scala | 76 ++++++++++++++----- ...erativelyReweightedLeastSquaresSuite.scala | 27 +++---- 3 files changed, 77 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index 143bf539b0af..b0ac87aaf581 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -20,8 +20,10 @@ package org.apache.spark.ml.optim import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.regression.GLRInstance import org.apache.spark.rdd.RDD + /** * Model fitted by [[IterativelyReweightedLeastSquares]]. * @param coefficients model coefficients @@ -43,7 +45,7 @@ private[ml] class IterativelyReweightedLeastSquaresModel( * find M-estimator in robust regression and other optimization problems. * * @param initialModel the initial guess model. - * @param reweightFunc the reweight function which is used to update offsets and weights + * @param reweightFunc the reweight function which is used to update working labels and weights * at each iteration. * @param fitIntercept whether to fit intercept. * @param regParam L2 regularization parameter used by WLS. @@ -57,13 +59,13 @@ private[ml] class IterativelyReweightedLeastSquaresModel( */ private[ml] class IterativelyReweightedLeastSquares( val initialModel: WeightedLeastSquaresModel, - val reweightFunc: (Instance, WeightedLeastSquaresModel) => (Double, Double), + val reweightFunc: (GLRInstance, WeightedLeastSquaresModel) => (Double, Double), val fitIntercept: Boolean, val regParam: Double, val maxIter: Int, val tol: Double) extends Logging with Serializable { - def fit(instances: RDD[Instance]): IterativelyReweightedLeastSquaresModel = { + def fit(instances: RDD[GLRInstance]): IterativelyReweightedLeastSquaresModel = { var converged = false var iter = 0 @@ -75,10 +77,10 @@ private[ml] class IterativelyReweightedLeastSquares( oldModel = model - // Update offsets and weights using reweightFunc + // Update working labels and weights using reweightFunc val newInstances = instances.map { instance => - val (newOffset, newWeight) = reweightFunc(instance, oldModel) - Instance(newOffset, newWeight, instance.features) + val (newLabel, newWeight) = reweightFunc(instance, oldModel) + Instance(newLabel, newWeight, instance.features) } // Estimate new model diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 1f5a4f07c426..68023c192b42 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -273,17 +273,21 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val instances: RDD[Instance] = - dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { - case Row(label: Double, weight: Double, features: Vector) => - Instance(label, weight, features) + val off = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) + val instances: RDD[GLRInstance] = + dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { + case Row(label: Double, weight: Double, offset: Double, features: Vector) => + new GLRInstance(label, weight, offset, features) } val model = if (familyObj == Gaussian && linkObj == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. + val wlsInstances: RDD[Instance] = instances.map { instance => + new Instance(instance.label - instance.offset, instance.weight, instance.features) + } val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true) - val wlsModel = optimizer.fit(instances) + val wlsModel = optimizer.fit(wlsInstances) val model = copyValues( new GeneralizedLinearRegressionModel(uid, wlsModel.coefficients, wlsModel.intercept) .setParent(this)) @@ -349,12 +353,12 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine * Get the initial guess model for [[IterativelyReweightedLeastSquares]]. */ def initialize( - instances: RDD[Instance], + instances: RDD[GLRInstance], fitIntercept: Boolean, regParam: Double): WeightedLeastSquaresModel = { val newInstances = instances.map { instance => val mu = family.initialize(instance.label, instance.weight) - val eta = predict(mu) + val eta = predict(mu) - instance.offset Instance(eta, instance.weight, instance.features) } // TODO: Make standardizeFeatures and standardizeLabel configurable. @@ -368,13 +372,13 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine * The reweight function used to update offsets and weights * at each iteration of [[IterativelyReweightedLeastSquares]]. */ - val reweightFunc: (Instance, WeightedLeastSquaresModel) => (Double, Double) = { - (instance: Instance, model: WeightedLeastSquaresModel) => { - val eta = model.predict(instance.features) + val reweightFunc: (GLRInstance, WeightedLeastSquaresModel) => (Double, Double) = { + (instance: GLRInstance, model: WeightedLeastSquaresModel) => { + val eta = model.predict(instance.features) + instance.offset val mu = fitted(eta) - val offset = eta + (instance.label - mu) * link.deriv(mu) - val weight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) - (offset, weight) + val newLabel = eta - instance.offset + (instance.label - mu) * link.deriv(mu) + val newWeight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) + (newLabel, newWeight) } } } @@ -746,15 +750,27 @@ class GeneralizedLinearRegressionModel private[ml] ( private lazy val familyAndLink = new FamilyAndLink(familyObj, linkObj) override protected def predict(features: Vector): Double = { - val eta = predictLink(features) + if (!isSet(offsetCol) || $(offsetCol).isEmpty) { + val eta = BLAS.dot(features, coefficients) + intercept + familyAndLink.fitted(eta) + } else { + throw new SparkException("Must supply offset value when offset is set.") + } + } + + /** + * Calculates the predicted value when offset is set. + */ + protected def predict(features: Vector, offset: Double): Double = { + val eta = predictLink(features, offset) familyAndLink.fitted(eta) } /** - * Calculate the link prediction (linear predictor) of the given instance. + * Calculates the link prediction (linear predictor) of the given instance. */ - private def predictLink(features: Vector): Double = { - BLAS.dot(features, coefficients) + intercept + private def predictLink(features: Vector, offset: Double): Double = { + BLAS.dot(features, coefficients) + intercept + offset } override def transform(dataset: Dataset[_]): DataFrame = { @@ -763,14 +779,15 @@ class GeneralizedLinearRegressionModel private[ml] ( } override protected def transformImpl(dataset: Dataset[_]): DataFrame = { - val predictUDF = udf { (features: Vector) => predict(features) } - val predictLinkUDF = udf { (features: Vector) => predictLink(features) } + val predictUDF = udf { (features: Vector, offset: Double) => predict(features, offset) } + val predictLinkUDF = udf { (features: Vector, offset: Double) => predictLink(features, offset) } + val off = if (!isSet(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) var output = dataset if ($(predictionCol).nonEmpty) { - output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), off)) } if (hasLinkPredictionCol) { - output = output.withColumn($(linkPredictionCol), predictLinkUDF(col($(featuresCol)))) + output = output.withColumn($(linkPredictionCol), predictLinkUDF(col($(featuresCol)), off)) } output.toDF() } @@ -1172,3 +1189,20 @@ class GeneralizedLinearRegressionTrainingSummary private[regression] ( } } } + + + +/** + * Class that represents an instance of data point with + * label, weight, features and offset. + */ +private[ml] class GLRInstance(val label: Double, val weight: Double, val offset: Double, + val features: Vector) extends Serializable{ + + def this(instance: Instance, offset: Double = 0.0) = { + this(instance.label, instance.weight, offset, instance.features) + } + + /** Converts to an Instance object by ignoring the offset */ + private[ml] def toInstance: Instance = Instance(label, weight, features) +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala index 50260952ecb6..59e549fb9dcd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala @@ -20,14 +20,15 @@ package org.apache.spark.ml.optim import org.apache.spark.SparkFunSuite import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.regression.GLRInstance import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext { - private var instances1: RDD[Instance] = _ - private var instances2: RDD[Instance] = _ + private var instances1: RDD[GLRInstance] = _ + private var instances2: RDD[GLRInstance] = _ override def beforeAll(): Unit = { super.beforeAll() @@ -43,7 +44,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)), Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)), Instance(0.0, 4.0, Vectors.dense(3.0, 3.0)) - ), 2) + ), 2).map(new GLRInstance(_)) /* R code: @@ -56,7 +57,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2) + ), 2).map(new GLRInstance(_)) } test("IRLS against GLM with Binomial errors") { @@ -156,7 +157,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes var idx = 0 for (fitIntercept <- Seq(false, true)) { val initial = new WeightedLeastSquares(fitIntercept, regParam = 0.0, elasticNetParam = 0.0, - standardizeFeatures = false, standardizeLabel = false).fit(instances2) + standardizeFeatures = false, standardizeLabel = false).fit(instances2.map(_.toInstance)) val irls = new IterativelyReweightedLeastSquares(initial, L1RegressionReweightFunc, fitIntercept, regParam = 0.0, maxIter = 200, tol = 1e-7).fit(instances2) val actual = Vectors.dense(irls.intercept, irls.coefficients(0), irls.coefficients(1)) @@ -169,29 +170,29 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes object IterativelyReweightedLeastSquaresSuite { def BinomialReweightFunc( - instance: Instance, + instance: GLRInstance, model: WeightedLeastSquaresModel): (Double, Double) = { - val eta = model.predict(instance.features) + val eta = model.predict(instance.features) + instance.offset val mu = 1.0 / (1.0 + math.exp(-1.0 * eta)) - val z = eta + (instance.label - mu) / (mu * (1.0 - mu)) + val z = eta - instance.offset + (instance.label - mu) / (mu * (1.0 - mu)) val w = mu * (1 - mu) * instance.weight (z, w) } def PoissonReweightFunc( - instance: Instance, + instance: GLRInstance, model: WeightedLeastSquaresModel): (Double, Double) = { - val eta = model.predict(instance.features) + val eta = model.predict(instance.features) + instance.offset val mu = math.exp(eta) - val z = eta + (instance.label - mu) / mu + val z = eta - instance.offset + (instance.label - mu) / mu val w = mu * instance.weight (z, w) } def L1RegressionReweightFunc( - instance: Instance, + instance: GLRInstance, model: WeightedLeastSquaresModel): (Double, Double) = { - val eta = model.predict(instance.features) + val eta = model.predict(instance.features) + instance.offset val e = math.max(math.abs(eta - instance.label), 1e-7) val w = 1 / e val y = instance.label From 7823f8af8b0926790816c9e79e9425e503e494ad Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 24 Jan 2017 22:55:56 -0800 Subject: [PATCH 04/22] add test for glm with offset --- .../GeneralizedLinearRegression.scala | 17 +++-- .../GeneralizedLinearRegressionSuite.scala | 73 +++++++++++++++++++ 2 files changed, 83 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 68023c192b42..b4e597b4c262 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -225,6 +225,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val /** * Sets the value of param [[offsetCol]]. + * The feature specified as offset has a constant coefficient of 1.0. * If this is not set or empty, we treat all instance offsets as 0.0. * Default is not set, so all instances have offset 0.0. * @@ -1190,19 +1191,21 @@ class GeneralizedLinearRegressionTrainingSummary private[regression] ( } } - - /** - * Class that represents an instance of data point with - * label, weight, features and offset. + * Case class that represents an instance of data point with + * label, weight, offset and features. */ -private[ml] class GLRInstance(val label: Double, val weight: Double, val offset: Double, - val features: Vector) extends Serializable{ +private[ml] case class GLRInstance(label: Double, weight: Double, offset: Double, + features: Vector) { def this(instance: Instance, offset: Double = 0.0) = { this(instance.label, instance.weight, offset, instance.features) } - /** Converts to an Instance object by ignoring the offset */ + /** Converts to an [[Instance]] object by leaving out the offset. */ private[ml] def toInstance: Instance = Instance(label, weight, features) + } + + + diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 828b95e544ae..7e084e527c61 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -578,6 +578,79 @@ class GeneralizedLinearRegressionSuite } } + test("generalized linear regression with offset") { + /* + R code: + df <- as.data.frame(matrix(c( + 1.0, 1.0, 2.0, 0.0, 5.0, + 2.0, 2.0, 0.5, 1.0, 2.0, + 1.0, 3.0, 1.0, 2.0, 1.0, + 2.0, 4.0, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) + families <- c(gaussian, poisson, Gamma) + f1 <- V1 ~ -1 + V4 + V5 + f2 <- V1 ~ V4 + V5 + for (f in c(f1, f2)) { + for (fam in families) { + model <- glm(f, df, family = fam, weights = V2, offset = V3) + print(as.vector(coef(model))) + } + } + + [1] 0.535040431 0.005390836 + [1] 0.1968355 -0.2061711 + [1] 0.307996 -0.153579 + [1] -0.8800000 0.7342857 0.1714286 + [1] -1.9991044 0.7247511 0.1424392 + [1] -0.27378146 0.31599396 -0.06204946 + */ + val dataset = Seq( + GLRInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), + GLRInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), + GLRInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), + GLRInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + ).toDF() + + val expected = Seq( + Vectors.dense(0.0, 0.535040431, 0.005390836), + Vectors.dense(0.0, 0.1968355, -0.2061711), + Vectors.dense(0.0, 0.307996, -0.153579), + Vectors.dense(-0.88, 0.7342857, 0.1714286), + Vectors.dense(-1.9991044, 0.7247511, 0.1424392), + Vectors.dense(-0.27378146, 0.31599396, -0.06204946)) + + import GeneralizedLinearRegression._ + + var idx = 0 + for (fitIntercept <- Seq(false, true)) { + for (family <- Seq("gaussian", "poisson", "gamma")) { + val trainer = new GeneralizedLinearRegression().setFamily(family) + .setFitIntercept(fitIntercept).setOffsetCol("offset") + .setWeightCol("weight").setLinkPredictionCol("linkPrediction") + val model = trainer.fit(dataset) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~= expected(idx) absTol 1e-4, s"Model mismatch: GLM with family = $family," + + s" and fitIntercept = $fitIntercept.") + + val familyObj = Family.fromName(family) + val familyLink = new FamilyAndLink(familyObj, familyObj.defaultLink) + model.transform(dataset).select("features", "offset", "prediction", "linkPrediction") + .collect().foreach { + case Row(features: DenseVector, offset: Double, prediction1: Double, + linkPrediction1: Double) => + val eta = BLAS.dot(features, model.coefficients) + model.intercept + offset + val prediction2 = familyLink.fitted(eta) + val linkPrediction2 = eta + assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"family = $family, and fitIntercept = $fitIntercept.") + assert(linkPrediction1 ~= linkPrediction2 relTol 1E-5, "Link Prediction mismatch: " + + s"GLM with family = $family, and fitIntercept = $fitIntercept.") + } + + idx += 1 + } + } + } + test("glm summary: gaussian family with weight") { /* R code: From a1f56952b4ffcdfb400ff0ce014987c50de5f33e Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 24 Jan 2017 23:11:42 -0800 Subject: [PATCH 05/22] minor cleanup --- .../IterativelyReweightedLeastSquares.scala | 1 - .../spark/ml/param/shared/sharedParams.scala | 10 +++++----- .../GeneralizedLinearRegression.scala | 17 +++++++---------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index b0ac87aaf581..36c93caf0605 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -23,7 +23,6 @@ import org.apache.spark.ml.linalg._ import org.apache.spark.ml.regression.GLRInstance import org.apache.spark.rdd.RDD - /** * Model fitted by [[IterativelyReweightedLeastSquares]]. * @param coefficients model coefficients diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index 0c3952be1dbd..f2c84091cd2e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -374,14 +374,14 @@ private[ml] trait HasWeightCol extends Params { } /** - * Trait for shared param offsetCol. - */ + * Trait for shared param offsetCol. + */ private[ml] trait HasOffsetCol extends Params { /** - * Param for offset column name. If this is not set or empty, we treat all instance offsets as 0.0. - * @group param - */ + * Param for offset column name. If this is not set or empty, we treat all instance offsets as 0.0. + * @group param + */ final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "offset column name. If this is not set or empty, we treat all instance offsets as 0.0") /** @group getParam */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index b4e597b4c262..504fd9c0b71c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -224,13 +224,13 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val def setWeightCol(value: String): this.type = set(weightCol, value) /** - * Sets the value of param [[offsetCol]]. - * The feature specified as offset has a constant coefficient of 1.0. - * If this is not set or empty, we treat all instance offsets as 0.0. - * Default is not set, so all instances have offset 0.0. - * - * @group setParam - */ + * Sets the value of param [[offsetCol]]. + * The feature specified as offset has a constant coefficient of 1.0. + * If this is not set or empty, we treat all instance offsets as 0.0. + * Default is not set, so all instances have offset 0.0. + * + * @group setParam + */ @Since("2.2.0") def setOffsetCol(value: String): this.type = set(offsetCol, value) @@ -1206,6 +1206,3 @@ private[ml] case class GLRInstance(label: Double, weight: Double, offset: Double private[ml] def toInstance: Instance = Instance(label, weight, features) } - - - From d071b95ccc94404d37cde7cc122cf8a13fd04449 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 24 Jan 2017 23:44:04 -0800 Subject: [PATCH 06/22] add doc for GLRInstance --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 504fd9c0b71c..7168b45c2e97 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -1194,10 +1194,16 @@ class GeneralizedLinearRegressionTrainingSummary private[regression] ( /** * Case class that represents an instance of data point with * label, weight, offset and features. + * + * @param label Label for this data point. + * @param weight The weight of this instance. + * @param offset The offset used for this data point. + * @param features The vector of features for this data point. */ private[ml] case class GLRInstance(label: Double, weight: Double, offset: Double, features: Vector) { + /** Constructs from an [[Instance]] object and offset */ def this(instance: Instance, offset: Double = 0.0) = { this(instance.label, instance.weight, offset, instance.features) } From d2afcb0c335c133f1a86c8587fd7459ac39935a0 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Wed, 25 Jan 2017 09:24:54 -0800 Subject: [PATCH 07/22] remove offset from shared param --- .../spark/ml/param/shared/sharedParams.scala | 15 --------------- .../regression/GeneralizedLinearRegression.scala | 12 +++++++++++- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala index f2c84091cd2e..e3e03dfd43dd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala @@ -373,21 +373,6 @@ private[ml] trait HasWeightCol extends Params { final def getWeightCol: String = $(weightCol) } -/** - * Trait for shared param offsetCol. - */ -private[ml] trait HasOffsetCol extends Params { - - /** - * Param for offset column name. If this is not set or empty, we treat all instance offsets as 0.0. - * @group param - */ - final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "offset column name. If this is not set or empty, we treat all instance offsets as 0.0") - - /** @group getParam */ - final def getOffsetCol: String = $(offsetCol) -} - /** * Trait for shared param solver (default: "auto"). */ diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 7168b45c2e97..c579d54dab3d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.types.{DataType, DoubleType, StructType} */ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParams with HasFitIntercept with HasMaxIter with HasTol with HasRegParam with HasWeightCol - with HasSolver with HasOffsetCol with Logging { + with HasSolver with Logging { import GeneralizedLinearRegression._ @@ -94,6 +94,16 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam @Since("2.0.0") def getLinkPredictionCol: String = $(linkPredictionCol) + /** + * Param for offset column name. If this is not set or empty, we treat all instance offsets as 0.0. + * @group param + */ + final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "offset column name. " + + "If this is not set or empty, we treat all instance offsets as 0.0") + + /** @group getParam */ + def getOffsetCol: String = $(offsetCol) + /** Checks whether we should output link prediction. */ private[regression] def hasLinkPredictionCol: Boolean = { isDefined(linkPredictionCol) && $(linkPredictionCol).nonEmpty From 9eca1a682d75e20df89ebdb0f1a01e02996f9c7f Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Wed, 25 Jan 2017 09:34:01 -0800 Subject: [PATCH 08/22] fix style issue --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index c579d54dab3d..633cb51b67b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -95,11 +95,12 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam def getLinkPredictionCol: String = $(linkPredictionCol) /** - * Param for offset column name. If this is not set or empty, we treat all instance offsets as 0.0. + * Param for offset column name. If this is not set or empty, we treat all + * instance offsets as 0.0. * @group param */ - final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "offset column name. " + - "If this is not set or empty, we treat all instance offsets as 0.0") + final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "The offset " + + "column name. If this is not set or empty, we treat all instance offsets as 0.0") /** @group getParam */ def getOffsetCol: String = $(offsetCol) From d44974cfe50092bb639a31aa7aa9b16eb1d21fae Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Wed, 25 Jan 2017 14:11:04 -0800 Subject: [PATCH 09/22] rename to OffsetInstance and add param check --- .../apache/spark/ml/feature/Instance.scala | 22 +++++++++ .../IterativelyReweightedLeastSquares.scala | 7 ++- .../GeneralizedLinearRegression.scala | 47 ++++++------------- ...erativelyReweightedLeastSquaresSuite.scala | 17 ++++--- .../GeneralizedLinearRegressionSuite.scala | 10 ++-- 5 files changed, 52 insertions(+), 51 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index cce3ca45ccd8..ba9224663cf4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -27,3 +27,25 @@ import org.apache.spark.ml.linalg.Vector * @param features The vector of features for this data point. */ private[ml] case class Instance(label: Double, weight: Double, features: Vector) + +/** + * Case class that represents an instance of data point with + * label, weight, offset and features. + * + * @param label Label for this data point. + * @param weight The weight of this instance. + * @param offset The offset used for this data point. + * @param features The vector of features for this data point. + */ +private[ml] case class OffsetInstance(label: Double, weight: Double, offset: Double, + features: Vector) { + + /** Constructs from an [[Instance]] object and offset */ + def this(instance: Instance, offset: Double = 0.0) = { + this(instance.label, instance.weight, offset, instance.features) + } + + /** Converts to an [[Instance]] object by leaving out the offset. */ + private[ml] def toInstance: Instance = Instance(label, weight, features) + +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index 36c93caf0605..ff05a2eaa75f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -18,9 +18,8 @@ package org.apache.spark.ml.optim import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.regression.GLRInstance import org.apache.spark.rdd.RDD /** @@ -58,13 +57,13 @@ private[ml] class IterativelyReweightedLeastSquaresModel( */ private[ml] class IterativelyReweightedLeastSquares( val initialModel: WeightedLeastSquaresModel, - val reweightFunc: (GLRInstance, WeightedLeastSquaresModel) => (Double, Double), + val reweightFunc: (OffsetInstance, WeightedLeastSquaresModel) => (Double, Double), val fitIntercept: Boolean, val regParam: Double, val maxIter: Int, val tol: Double) extends Logging with Serializable { - def fit(instances: RDD[GLRInstance]): IterativelyReweightedLeastSquaresModel = { + def fit(instances: RDD[OffsetInstance]): IterativelyReweightedLeastSquaresModel = { var converged = false var iter = 0 diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 633cb51b67b5..66b2ff463abe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg.{BLAS, Vector} import org.apache.spark.ml.optim._ import org.apache.spark.ml.param._ @@ -123,6 +123,9 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam s"with ${$(family)} family does not support ${$(link)} link function.") } val newSchema = super.validateAndTransformSchema(schema, fitting, featuresDataType) + if (isSet(offsetCol) && $(offsetCol).nonEmpty) { + SchemaUtils.checkNumericType(schema, $(offsetCol)) + } if (hasLinkPredictionCol) { SchemaUtils.appendColumn(newSchema, $(linkPredictionCol), DoubleType) } else { @@ -286,16 +289,16 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val off = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) - val instances: RDD[GLRInstance] = + val instances: RDD[OffsetInstance] = dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, offset: Double, features: Vector) => - new GLRInstance(label, weight, offset, features) + OffsetInstance(label, weight, offset, features) } val model = if (familyObj == Gaussian && linkObj == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. val wlsInstances: RDD[Instance] = instances.map { instance => - new Instance(instance.label - instance.offset, instance.weight, instance.features) + Instance(instance.label - instance.offset, instance.weight, instance.features) } val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true) @@ -365,7 +368,7 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine * Get the initial guess model for [[IterativelyReweightedLeastSquares]]. */ def initialize( - instances: RDD[GLRInstance], + instances: RDD[OffsetInstance], fitIntercept: Boolean, regParam: Double): WeightedLeastSquaresModel = { val newInstances = instances.map { instance => @@ -384,11 +387,11 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine * The reweight function used to update offsets and weights * at each iteration of [[IterativelyReweightedLeastSquares]]. */ - val reweightFunc: (GLRInstance, WeightedLeastSquaresModel) => (Double, Double) = { - (instance: GLRInstance, model: WeightedLeastSquaresModel) => { - val eta = model.predict(instance.features) + instance.offset - val mu = fitted(eta) - val newLabel = eta - instance.offset + (instance.label - mu) * link.deriv(mu) + val reweightFunc: (OffsetInstance, WeightedLeastSquaresModel) => (Double, Double) = { + (instance: OffsetInstance, model: WeightedLeastSquaresModel) => { + val eta = model.predict(instance.features) + val mu = fitted(eta + instance.offset) + val newLabel = eta + (instance.label - mu) * link.deriv(mu) val newWeight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) (newLabel, newWeight) } @@ -766,7 +769,7 @@ class GeneralizedLinearRegressionModel private[ml] ( val eta = BLAS.dot(features, coefficients) + intercept familyAndLink.fitted(eta) } else { - throw new SparkException("Must supply offset value when offset is set.") + throw new SparkException("Must supply offset to predict when offset column is set.") } } @@ -1201,25 +1204,3 @@ class GeneralizedLinearRegressionTrainingSummary private[regression] ( } } } - -/** - * Case class that represents an instance of data point with - * label, weight, offset and features. - * - * @param label Label for this data point. - * @param weight The weight of this instance. - * @param offset The offset used for this data point. - * @param features The vector of features for this data point. - */ -private[ml] case class GLRInstance(label: Double, weight: Double, offset: Double, - features: Vector) { - - /** Constructs from an [[Instance]] object and offset */ - def this(instance: Instance, offset: Double = 0.0) = { - this(instance.label, instance.weight, offset, instance.features) - } - - /** Converts to an [[Instance]] object by leaving out the offset. */ - private[ml] def toInstance: Instance = Instance(label, weight, features) - -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala index 59e549fb9dcd..cdce453f411a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala @@ -18,17 +18,16 @@ package org.apache.spark.ml.optim import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg.Vectors -import org.apache.spark.ml.regression.GLRInstance import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.rdd.RDD class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTestSparkContext { - private var instances1: RDD[GLRInstance] = _ - private var instances2: RDD[GLRInstance] = _ + private var instances1: RDD[OffsetInstance] = _ + private var instances2: RDD[OffsetInstance] = _ override def beforeAll(): Unit = { super.beforeAll() @@ -44,7 +43,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)), Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)), Instance(0.0, 4.0, Vectors.dense(3.0, 3.0)) - ), 2).map(new GLRInstance(_)) + ), 2).map(new OffsetInstance(_)) /* R code: @@ -57,7 +56,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2).map(new GLRInstance(_)) + ), 2).map(new OffsetInstance(_)) } test("IRLS against GLM with Binomial errors") { @@ -170,7 +169,7 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes object IterativelyReweightedLeastSquaresSuite { def BinomialReweightFunc( - instance: GLRInstance, + instance: OffsetInstance, model: WeightedLeastSquaresModel): (Double, Double) = { val eta = model.predict(instance.features) + instance.offset val mu = 1.0 / (1.0 + math.exp(-1.0 * eta)) @@ -180,7 +179,7 @@ object IterativelyReweightedLeastSquaresSuite { } def PoissonReweightFunc( - instance: GLRInstance, + instance: OffsetInstance, model: WeightedLeastSquaresModel): (Double, Double) = { val eta = model.predict(instance.features) + instance.offset val mu = math.exp(eta) @@ -190,7 +189,7 @@ object IterativelyReweightedLeastSquaresSuite { } def L1RegressionReweightFunc( - instance: GLRInstance, + instance: OffsetInstance, model: WeightedLeastSquaresModel): (Double, Double) = { val eta = model.predict(instance.features) + instance.offset val e = math.max(math.abs(eta - instance.label), 1e-7) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 7e084e527c61..c90afc255e3b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -21,7 +21,7 @@ import scala.util.Random import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LogisticRegressionSuite._ -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors} import org.apache.spark.ml.param.{ParamMap, ParamsSuite} @@ -604,10 +604,10 @@ class GeneralizedLinearRegressionSuite [1] -0.27378146 0.31599396 -0.06204946 */ val dataset = Seq( - GLRInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), - GLRInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), - GLRInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), - GLRInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + OffsetInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), + OffsetInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) ).toDF() val expected = Seq( From 9c320ee3cf12b9403a887e17659c87abd33a84aa Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Thu, 26 Jan 2017 10:57:01 -0800 Subject: [PATCH 10/22] create separate instance definition when initializing --- .../GeneralizedLinearRegression.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 66b2ff463abe..52f5b1fbe45a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -289,20 +289,17 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val off = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) - val instances: RDD[OffsetInstance] = - dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { - case Row(label: Double, weight: Double, offset: Double, features: Vector) => - OffsetInstance(label, weight, offset, features) - } val model = if (familyObj == Gaussian && linkObj == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. - val wlsInstances: RDD[Instance] = instances.map { instance => - Instance(instance.label - instance.offset, instance.weight, instance.features) - } + val instances: RDD[Instance] = + dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { + case Row(label: Double, weight: Double, offset: Double, features: Vector) => + Instance(label - offset, weight, features) + } val optimizer = new WeightedLeastSquares($(fitIntercept), $(regParam), elasticNetParam = 0.0, standardizeFeatures = true, standardizeLabel = true) - val wlsModel = optimizer.fit(wlsInstances) + val wlsModel = optimizer.fit(instances) val model = copyValues( new GeneralizedLinearRegressionModel(uid, wlsModel.coefficients, wlsModel.intercept) .setParent(this)) @@ -310,6 +307,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val wlsModel.diagInvAtWA.toArray, 1, getSolver) model.setSummary(Some(trainingSummary)) } else { + val instances: RDD[OffsetInstance] = + dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { + case Row(label: Double, weight: Double, offset: Double, features: Vector) => + OffsetInstance(label, weight, offset, features) + } // Fit Generalized Linear Model by iteratively reweighted least squares (IRLS). val initialModel = familyAndLink.initialize(instances, $(fitIntercept), $(regParam)) val optimizer = new IterativelyReweightedLeastSquares(initialModel, From e183c08373733249d7b9c476875c2a4a2dff3c05 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Thu, 26 Jan 2017 11:08:11 -0800 Subject: [PATCH 11/22] fix style in test --- .../GeneralizedLinearRegression.scala | 6 +-- .../GeneralizedLinearRegressionSuite.scala | 44 +++++++++---------- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 52f5b1fbe45a..121ab2f78660 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -798,13 +798,13 @@ class GeneralizedLinearRegressionModel private[ml] ( override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val predictUDF = udf { (features: Vector, offset: Double) => predict(features, offset) } val predictLinkUDF = udf { (features: Vector, offset: Double) => predictLink(features, offset) } - val off = if (!isSet(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) + val offset = if (!isSet(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) var output = dataset if ($(predictionCol).nonEmpty) { - output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), off)) + output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), offset)) } if (hasLinkPredictionCol) { - output = output.withColumn($(linkPredictionCol), predictLinkUDF(col($(featuresCol)), off)) + output = output.withColumn($(linkPredictionCol), predictLinkUDF(col($(featuresCol)), offset)) } output.toDF() } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index c90afc255e3b..a3cca159f0d3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -623,28 +623,28 @@ class GeneralizedLinearRegressionSuite var idx = 0 for (fitIntercept <- Seq(false, true)) { for (family <- Seq("gaussian", "poisson", "gamma")) { - val trainer = new GeneralizedLinearRegression().setFamily(family) - .setFitIntercept(fitIntercept).setOffsetCol("offset") - .setWeightCol("weight").setLinkPredictionCol("linkPrediction") - val model = trainer.fit(dataset) - val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) - assert(actual ~= expected(idx) absTol 1e-4, s"Model mismatch: GLM with family = $family," + - s" and fitIntercept = $fitIntercept.") - - val familyObj = Family.fromName(family) - val familyLink = new FamilyAndLink(familyObj, familyObj.defaultLink) - model.transform(dataset).select("features", "offset", "prediction", "linkPrediction") - .collect().foreach { - case Row(features: DenseVector, offset: Double, prediction1: Double, - linkPrediction1: Double) => - val eta = BLAS.dot(features, model.coefficients) + model.intercept + offset - val prediction2 = familyLink.fitted(eta) - val linkPrediction2 = eta - assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + - s"family = $family, and fitIntercept = $fitIntercept.") - assert(linkPrediction1 ~= linkPrediction2 relTol 1E-5, "Link Prediction mismatch: " + - s"GLM with family = $family, and fitIntercept = $fitIntercept.") - } + val trainer = new GeneralizedLinearRegression().setFamily(family) + .setFitIntercept(fitIntercept).setOffsetCol("offset") + .setWeightCol("weight").setLinkPredictionCol("linkPrediction") + val model = trainer.fit(dataset) + val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) + assert(actual ~= expected(idx) absTol 1e-4, s"Model mismatch: GLM with family = $family," + + s" and fitIntercept = $fitIntercept.") + + val familyObj = Family.fromName(family) + val familyLink = new FamilyAndLink(familyObj, familyObj.defaultLink) + model.transform(dataset).select("features", "offset", "prediction", "linkPrediction") + .collect().foreach { + case Row(features: DenseVector, offset: Double, prediction1: Double, + linkPrediction1: Double) => + val eta = BLAS.dot(features, model.coefficients) + model.intercept + offset + val prediction2 = familyLink.fitted(eta) + val linkPrediction2 = eta + assert(prediction1 ~= prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"family = $family, and fitIntercept = $fitIntercept.") + assert(linkPrediction1 ~= linkPrediction2 relTol 1E-5, "Link Prediction mismatch: " + + s"GLM with family = $family, and fitIntercept = $fitIntercept.") + } idx += 1 } From da4174a6c2639001828e587794e1a32dcf8db15d Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Fri, 27 Jan 2017 00:03:02 -0800 Subject: [PATCH 12/22] add test for tweedie --- .../GeneralizedLinearRegressionSuite.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index e34d906a6716..3f6eef49fed6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -746,12 +746,13 @@ class GeneralizedLinearRegressionSuite test("generalized linear regression with offset") { /* R code: + library(statmod) df <- as.data.frame(matrix(c( 1.0, 1.0, 2.0, 0.0, 5.0, 2.0, 2.0, 0.5, 1.0, 2.0, 1.0, 3.0, 1.0, 2.0, 1.0, 2.0, 4.0, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) - families <- c(gaussian, poisson, Gamma) + families <- list(gaussian, poisson, Gamma, tweedie(1.5)) f1 <- V1 ~ -1 + V4 + V5 f2 <- V1 ~ V4 + V5 for (f in c(f1, f2)) { @@ -764,9 +765,11 @@ class GeneralizedLinearRegressionSuite [1] 0.535040431 0.005390836 [1] 0.1968355 -0.2061711 [1] 0.307996 -0.153579 + [1] 0.32166185 -0.09698986 [1] -0.8800000 0.7342857 0.1714286 [1] -1.9991044 0.7247511 0.1424392 [1] -0.27378146 0.31599396 -0.06204946 + [1] -0.17118812 0.31200361 -0.02541656 */ val dataset = Seq( OffsetInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), @@ -779,18 +782,21 @@ class GeneralizedLinearRegressionSuite Vectors.dense(0.0, 0.535040431, 0.005390836), Vectors.dense(0.0, 0.1968355, -0.2061711), Vectors.dense(0.0, 0.307996, -0.153579), + Vectors.dense(0.0, 0.32166185, -0.09698986), Vectors.dense(-0.88, 0.7342857, 0.1714286), Vectors.dense(-1.9991044, 0.7247511, 0.1424392), - Vectors.dense(-0.27378146, 0.31599396, -0.06204946)) + Vectors.dense(-0.27378146, 0.31599396, -0.06204946), + Vectors.dense(-0.17118812, 0.31200361, -0.02541656)) import GeneralizedLinearRegression._ var idx = 0 for (fitIntercept <- Seq(false, true)) { - for (family <- Seq("gaussian", "poisson", "gamma")) { - val trainer = new GeneralizedLinearRegression().setFamily(family) + for (family <- Seq("gaussian", "poisson", "gamma", "tweedie")) { + var trainer = new GeneralizedLinearRegression().setFamily(family) .setFitIntercept(fitIntercept).setOffsetCol("offset") .setWeightCol("weight").setLinkPredictionCol("linkPrediction") + if (family == "tweedie") trainer = trainer.setVariancePower(1.5) val model = trainer.fit(dataset) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) assert(actual ~= expected(idx) absTol 1e-4, s"Model mismatch: GLM with family = $family," + From 52bc32b2d86b2cd5ce092f86ee61f8fe9aebec5d Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Fri, 27 Jan 2017 16:30:48 -0800 Subject: [PATCH 13/22] cast offset and add in instrumentation --- .../regression/GeneralizedLinearRegression.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 18774c615942..970bade0eb39 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -350,7 +350,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val numFeatures = dataset.select(col($(featuresCol))).first().getAs[Vector](0).size val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, weightCol, predictionCol, linkPredictionCol, + instr.logParams(labelCol, featuresCol, weightCol, offsetCol, predictionCol, linkPredictionCol, family, solver, fitIntercept, link, maxIter, regParam, tol) instr.logNumFeatures(numFeatures) @@ -361,7 +361,11 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val off = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) + val off = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) { + lit(0.0) + } else { + col($(offsetCol)).cast(DoubleType) + } val model = if (familyAndLink.family == Gaussian && familyAndLink.link == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. @@ -1002,7 +1006,11 @@ class GeneralizedLinearRegressionModel private[ml] ( override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val predictUDF = udf { (features: Vector, offset: Double) => predict(features, offset) } val predictLinkUDF = udf { (features: Vector, offset: Double) => predictLink(features, offset) } - val offset = if (!isSet(offsetCol) || $(offsetCol).isEmpty) lit(0.0) else col($(offsetCol)) + val offset = if (!isSet(offsetCol) || $(offsetCol).isEmpty) { + lit(0.0) + } else { + col($(offsetCol)).cast(DoubleType) + } var output = dataset if ($(predictionCol).nonEmpty) { output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), offset)) From 59e10f77f2676a3755536a46d171ee723d226ee5 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Mon, 30 Jan 2017 09:30:44 -0800 Subject: [PATCH 14/22] update var name --- .../spark/ml/regression/GeneralizedLinearRegression.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 970bade0eb39..88083c560dc8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -361,7 +361,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val } val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val off = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) { + val offset = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) { lit(0.0) } else { col($(offsetCol)).cast(DoubleType) @@ -370,7 +370,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val val model = if (familyAndLink.family == Gaussian && familyAndLink.link == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. val instances: RDD[Instance] = - dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { + dataset.select(col($(labelCol)), w, offset, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, offset: Double, features: Vector) => Instance(label - offset, weight, features) } @@ -385,7 +385,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val model.setSummary(Some(trainingSummary)) } else { val instances: RDD[OffsetInstance] = - dataset.select(col($(labelCol)), w, off, col($(featuresCol))).rdd.map { + dataset.select(col($(labelCol)), w, offset, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, offset: Double, features: Vector) => OffsetInstance(label, weight, offset, features) } From 1d41bddea920cb8716ced0221afed81994c17d39 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 7 Feb 2017 21:18:38 -0800 Subject: [PATCH 15/22] add test for intercept only --- .../IterativelyReweightedLeastSquares.scala | 2 +- .../GeneralizedLinearRegressionSuite.scala | 65 +++++++++++++++++++ 2 files changed, 66 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index ff05a2eaa75f..6961b45f55e4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -89,7 +89,7 @@ private[ml] class IterativelyReweightedLeastSquares( val oldCoefficients = oldModel.coefficients val coefficients = model.coefficients BLAS.axpy(-1.0, coefficients, oldCoefficients) - val maxTolOfCoefficients = oldCoefficients.toArray.reduce { (x, y) => + val maxTolOfCoefficients = oldCoefficients.toArray.foldLeft(0.0) { (x, y) => math.max(math.abs(x), math.abs(y)) } val maxTol = math.max(maxTolOfCoefficients, math.abs(oldModel.intercept - model.intercept)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 3f6eef49fed6..a7e02f76b2c5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -821,6 +821,71 @@ class GeneralizedLinearRegressionSuite } } + test("generalized linear regression intercept only model with offset") { + /* + R code: + library(statmod) + df <- as.data.frame(matrix(c( + 1.0, 1.0, 2.0, 0.0, 5.0, + 2.0, 2.0, 0.5, 1.0, 2.0, + 1.0, 3.0, 1.0, 2.0, 1.0, + 2.0, 4.0, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) + families <- list(gaussian, poisson, Gamma, tweedie(1.5)) + f1 <- V1 ~ -1 + V4 + V5 + f2 <- V1 ~ V4 + V5 + for (f in c(f1, f2)) { + for (fam in families) { + model <- glm(f, df, family = fam, weights = V2, offset = V3) + print(as.vector(coef(model))) + } + } + + [1] 0.535040431 0.005390836 + [1] 0.1968355 -0.2061711 + [1] 0.307996 -0.153579 + [1] 0.32166185 -0.09698986 + [1] -0.8800000 0.7342857 0.1714286 + [1] -1.9991044 0.7247511 0.1424392 + [1] -0.27378146 0.31599396 -0.06204946 + [1] -0.17118812 0.31200361 -0.02541656 + */ + val dataset = Seq( + OffsetInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), + OffsetInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + ).toDF() + + val expected = Seq(1.0, -0.3559835, 0.3618836, 0.558434) + + import GeneralizedLinearRegression._ + + var idx = 0 + for (family <- Seq("gaussian", "poisson", "gamma", "tweedie")) { + var trainer = new GeneralizedLinearRegression().setFamily(family).setOffsetCol("offset") + .setWeightCol("weight").setLinkPredictionCol("linkPrediction") + if (family == "tweedie") trainer = trainer.setVariancePower(1.5) + val model = trainer.fit(dataset) + assert(model.intercept ~= expected(idx) absTol 1e-4, s"Model mismatch: " + + s"GLM with family = $family.") + + val familyLink = FamilyAndLink(trainer) + model.transform(dataset).select("features", "offset", "prediction", "linkPrediction") + .collect().foreach { + case Row(features: DenseVector, offset: Double, prediction1: Double, + linkPrediction1: Double) => + val eta = model.intercept + offset + val prediction2 = familyLink.fitted(eta) + val linkPrediction2 = eta + assert(prediction1 ~== prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + + s"family = $family.") + assert(linkPrediction1 ~== linkPrediction2 relTol 1E-5, "Link Prediction mismatch: " + + s"GLM with family = $family.") + } + idx += 1 + } + } + test("glm summary: gaussian family with weight") { /* R code: From fb372adc2482cd3add1978b69708f8a9a1dd750d Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Wed, 8 Feb 2017 13:23:07 -0800 Subject: [PATCH 16/22] update test --- .../ml/regression/GeneralizedLinearRegressionSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index a7e02f76b2c5..c054f42b8c57 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -850,10 +850,10 @@ class GeneralizedLinearRegressionSuite [1] -0.17118812 0.31200361 -0.02541656 */ val dataset = Seq( - OffsetInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), - OffsetInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), - OffsetInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), - OffsetInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + OffsetInstance(1.0, 1.0, 2.0, Vectors.zeros(0)), + OffsetInstance(2.0, 2.0, 0.5, Vectors.zeros(0)), + OffsetInstance(1.0, 3.0, 1.0, Vectors.zeros(0)), + OffsetInstance(2.0, 4.0, 0.0, Vectors.zeros(0)) ).toDF() val expected = Seq(1.0, -0.3559835, 0.3618836, 0.558434) From afb46438c744b08b53a712cb9387e71af17c366e Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Wed, 8 Feb 2017 22:54:26 -0800 Subject: [PATCH 17/22] implement null dev for offset model --- .../GeneralizedLinearRegression.scala | 109 ++++++++++-------- .../GeneralizedLinearRegressionSuite.scala | 1 - 2 files changed, 60 insertions(+), 50 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 78fa8abd6d2a..8483f00bbb85 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -25,7 +25,7 @@ import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.PredictorParams import org.apache.spark.ml.feature.{Instance, OffsetInstance} -import org.apache.spark.ml.linalg.{BLAS, Vector} +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} import org.apache.spark.ml.optim._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -179,9 +179,7 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam } val newSchema = super.validateAndTransformSchema(schema, fitting, featuresDataType) - if (isSet(offsetCol) && $(offsetCol).nonEmpty) { - SchemaUtils.checkNumericType(schema, $(offsetCol)) - } + if (isSetOffsetCol(this)) SchemaUtils.checkNumericType(schema, $(offsetCol)) if (hasLinkPredictionCol) { SchemaUtils.appendColumn(newSchema, $(linkPredictionCol), DoubleType) } else { @@ -364,12 +362,8 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val "GeneralizedLinearRegression was given data with 0 features, and with Param fitIntercept " + "set to false. To fit a model with 0 features, fitIntercept must be set to true." ) - val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) - val offset = if (!isDefined(offsetCol) || $(offsetCol).isEmpty) { - lit(0.0) - } else { - col($(offsetCol)).cast(DoubleType) - } + val w = if (!isSetWeightCol(this)) lit(1.0) else col($(weightCol)) + val offset = if (!isSetOffsetCol(this)) lit(0.0) else col($(offsetCol)).cast(DoubleType) val model = if (familyAndLink.family == Gaussian && familyAndLink.link == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. @@ -441,6 +435,14 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine private[regression] val epsilon: Double = 1E-16 + /** Checks whether weight column is set and nonempty */ + private[regression] def isSetWeightCol(params: GeneralizedLinearRegressionBase): Boolean = + params.isSet(params.weightCol) && !params.getWeightCol.isEmpty + + /** Checks whether offset column is set and nonempty */ + private[regression] def isSetOffsetCol(params: GeneralizedLinearRegressionBase): Boolean = + params.isSet(params.offsetCol) && !params.getOffsetCol.isEmpty + /** * Wrapper of family and link combination used in the model. */ @@ -979,7 +981,7 @@ class GeneralizedLinearRegressionModel private[ml] ( private lazy val familyAndLink = FamilyAndLink(this) override protected def predict(features: Vector): Double = { - if (!isSet(offsetCol) || $(offsetCol).isEmpty) { + if (!isSetOffsetCol(this)) { val eta = BLAS.dot(features, coefficients) + intercept familyAndLink.fitted(eta) } else { @@ -1010,11 +1012,7 @@ class GeneralizedLinearRegressionModel private[ml] ( override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val predictUDF = udf { (features: Vector, offset: Double) => predict(features, offset) } val predictLinkUDF = udf { (features: Vector, offset: Double) => predictLink(features, offset) } - val offset = if (!isSet(offsetCol) || $(offsetCol).isEmpty) { - lit(0.0) - } else { - col($(offsetCol)).cast(DoubleType) - } + val offset = if (!isSetOffsetCol(this)) lit(0.0) else col($(offsetCol)).cast(DoubleType) var output = dataset if ($(predictionCol).nonEmpty) { output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), offset)) @@ -1191,9 +1189,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** Degrees of freedom. */ @Since("2.0.0") - lazy val degreesOfFreedom: Long = { - numInstances - rank - } + lazy val degreesOfFreedom: Long = numInstances - rank /** The residual degrees of freedom. */ @Since("2.0.0") @@ -1201,18 +1197,20 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** The residual degrees of freedom for the null model. */ @Since("2.0.0") - lazy val residualDegreeOfFreedomNull: Long = if (model.getFitIntercept) { - numInstances - 1 - } else { - numInstances + lazy val residualDegreeOfFreedomNull: Long = { + if (model.getFitIntercept) numInstances - 1 else numInstances } - private def weightCol: Column = { - if (!model.isDefined(model.weightCol) || model.getWeightCol.isEmpty) { - lit(1.0) - } else { - col(model.getWeightCol) - } + private def label: Column = col(model.getLabelCol).cast(DoubleType) + + private def prediction: Column = col(predictionCol) + + private def weight: Column = { + if (!isSetWeightCol(model)) lit(1.0) else col(model.getWeightCol) + } + + private def offset: Column = { + if (!isSetOffsetCol(model)) lit(0.0) else col(model.getOffsetCol).cast(DoubleType) } private[regression] lazy val devianceResiduals: DataFrame = { @@ -1220,25 +1218,23 @@ class GeneralizedLinearRegressionSummary private[regression] ( val r = math.sqrt(math.max(family.deviance(y, mu, weight), 0.0)) if (y > mu) r else -1.0 * r } - val w = weightCol predictions.select( - drUDF(col(model.getLabelCol), col(predictionCol), w).as("devianceResiduals")) + drUDF(label, prediction, weight).as("devianceResiduals")) } private[regression] lazy val pearsonResiduals: DataFrame = { val prUDF = udf { mu: Double => family.variance(mu) } - val w = weightCol - predictions.select(col(model.getLabelCol).minus(col(predictionCol)) - .multiply(sqrt(w)).divide(sqrt(prUDF(col(predictionCol)))).as("pearsonResiduals")) + predictions.select(label.minus(prediction) + .multiply(sqrt(weight)).divide(sqrt(prUDF(prediction))).as("pearsonResiduals")) } private[regression] lazy val workingResiduals: DataFrame = { val wrUDF = udf { (y: Double, mu: Double) => (y - mu) * link.deriv(mu) } - predictions.select(wrUDF(col(model.getLabelCol), col(predictionCol)).as("workingResiduals")) + predictions.select(wrUDF(label, prediction).as("workingResiduals")) } private[regression] lazy val responseResiduals: DataFrame = { - predictions.select(col(model.getLabelCol).minus(col(predictionCol)).as("responseResiduals")) + predictions.select(label.minus(prediction).as("responseResiduals")) } /** @@ -1270,16 +1266,33 @@ class GeneralizedLinearRegressionSummary private[regression] ( */ @Since("2.0.0") lazy val nullDeviance: Double = { - val w = weightCol - val wtdmu: Double = if (model.getFitIntercept) { - val agg = predictions.agg(sum(w.multiply(col(model.getLabelCol))), sum(w)).first() - agg.getDouble(0) / agg.getDouble(1) + val intercept: Double = if (!model.getFitIntercept) { + 0.0 } else { - link.unlink(0.0) + /* + Estimate intercept analytically when there is no offset, or when there is offset but + the model is Gaussian family with identity link. Otherwise, fit an intercept only model. + */ + if (!isSetOffsetCol(model) || + (isSetOffsetCol(model) && family == Gaussian && link == Identity)) { + val agg = predictions.agg(sum(weight.multiply( + label.minus(offset))), sum(weight)).first() + link.link(agg.getDouble(0) / agg.getDouble(1)) + } else { + val featureNull = "feature_" + java.util.UUID.randomUUID.toString + val glr = new GeneralizedLinearRegression().copy(model.extractParamMap) + .setFeaturesCol(featureNull) + val emptyVectorUDF = udf{ () => Vectors.zeros(0) } + glr.fit( + predictions.withColumn(featureNull, emptyVectorUDF()) + ).intercept + } } - predictions.select(col(model.getLabelCol).cast(DoubleType), w).rdd.map { - case Row(y: Double, weight: Double) => - family.deviance(y, wtdmu, weight) + + println("intercept is: " + intercept) + predictions.select(label, offset, weight).rdd.map { + case Row(y: Double, offset: Double, weight: Double) => + family.deviance(y, link.unlink(intercept + offset), weight) }.sum() } @@ -1288,8 +1301,7 @@ class GeneralizedLinearRegressionSummary private[regression] ( */ @Since("2.0.0") lazy val deviance: Double = { - val w = weightCol - predictions.select(col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map { + predictions.select(label, prediction, weight).rdd.map { case Row(label: Double, pred: Double, weight: Double) => family.deviance(label, pred, weight) }.sum() @@ -1314,10 +1326,9 @@ class GeneralizedLinearRegressionSummary private[regression] ( /** Akaike Information Criterion (AIC) for the fitted model. */ @Since("2.0.0") lazy val aic: Double = { - val w = weightCol - val weightSum = predictions.select(w).agg(sum(w)).first().getDouble(0) + val weightSum = predictions.select(weight).agg(sum(weight)).first().getDouble(0) val t = predictions.select( - col(model.getLabelCol).cast(DoubleType), col(predictionCol), w).rdd.map { + label, prediction, weight).rdd.map { case Row(label: Double, pred: Double, weight: Double) => (label, pred, weight) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 605e069c45ae..e0a8786311a4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -775,7 +775,6 @@ class GeneralizedLinearRegressionSuite import GeneralizedLinearRegression._ var idx = 0 - for (family <- Seq("gaussian", "poisson", "binomial", "gamma", "tweedie")) { for (useWeight <- Seq(false, true)) { val trainer = new GeneralizedLinearRegression().setFamily(family) From fc64d32e72e1e406d6f7ab27563ea622aa7b1397 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Thu, 9 Feb 2017 17:23:58 -0800 Subject: [PATCH 18/22] fix null deviance calculation and add tests --- .../GeneralizedLinearRegression.scala | 14 +- .../GeneralizedLinearRegressionSuite.scala | 670 ++++++++---------- 2 files changed, 314 insertions(+), 370 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 8483f00bbb85..b777544f5bfe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -1279,17 +1279,19 @@ class GeneralizedLinearRegressionSummary private[regression] ( label.minus(offset))), sum(weight)).first() link.link(agg.getDouble(0) / agg.getDouble(1)) } else { + // Create empty feature column and fit intercept only model using param setting from model val featureNull = "feature_" + java.util.UUID.randomUUID.toString - val glr = new GeneralizedLinearRegression().copy(model.extractParamMap) - .setFeaturesCol(featureNull) + val paramMap = model.extractParamMap() + paramMap.put(model.featuresCol, featureNull) + if (family.name != "tweedie") { + paramMap.remove(model.variancePower) + } val emptyVectorUDF = udf{ () => Vectors.zeros(0) } - glr.fit( - predictions.withColumn(featureNull, emptyVectorUDF()) + model.parent.fit( + dataset.withColumn(featureNull, emptyVectorUDF()), paramMap ).intercept } } - - println("intercept is: " + intercept) predictions.select(label, offset, weight).rdd.map { case Row(y: Double, offset: Double, weight: Double) => family.deviance(y, link.unlink(intercept + offset), weight) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index e0a8786311a4..f084d5325af0 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -802,12 +802,13 @@ class GeneralizedLinearRegressionSuite /* R code: library(statmod) + df <- as.data.frame(matrix(c( - 1.0, 1.0, 2.0, 0.0, 5.0, - 2.0, 2.0, 0.5, 1.0, 2.0, - 1.0, 3.0, 1.0, 2.0, 1.0, - 2.0, 4.0, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) - families <- list(gaussian, poisson, Gamma, tweedie(1.5)) + 0.2, 1.0, 2.0, 0.0, 5.0, + 0.5, 2.1, 0.5, 1.0, 2.0, + 0.9, 0.4, 1.0, 2.0, 1.0, + 0.7, 0.7, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) + families <- list(gaussian, binomial, poisson, Gamma, tweedie(1.5)) f1 <- V1 ~ -1 + V4 + V5 f2 <- V1 ~ V4 + V5 for (f in c(f1, f2)) { @@ -816,39 +817,42 @@ class GeneralizedLinearRegressionSuite print(as.vector(coef(model))) } } - - [1] 0.535040431 0.005390836 - [1] 0.1968355 -0.2061711 - [1] 0.307996 -0.153579 - [1] 0.32166185 -0.09698986 - [1] -0.8800000 0.7342857 0.1714286 - [1] -1.9991044 0.7247511 0.1424392 - [1] -0.27378146 0.31599396 -0.06204946 - [1] -0.17118812 0.31200361 -0.02541656 + [1] 0.5169222 -0.3344444 + [1] 0.9419107 -0.6864404 + [1] 0.1812436 -0.6568422 + [1] -0.2869094 0.7857710 + [1] 0.1055254 0.2979113 + [1] -0.05990345 0.53188982 -0.32118415 + [1] -0.2147117 0.9911750 -0.6356096 + [1] -1.5616130 0.6646470 -0.3192581 + [1] 0.3390397 -0.3406099 0.6870259 + [1] 0.3665034 0.1039416 0.1484616 */ val dataset = Seq( - OffsetInstance(1.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), - OffsetInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), - OffsetInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), - OffsetInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + OffsetInstance(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(0.5, 2.1, 0.5, Vectors.dense(1.0, 2.0)), + OffsetInstance(0.9, 0.4, 1.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(0.7, 0.7, 0.0, Vectors.dense(3.0, 3.0)) ).toDF() val expected = Seq( - Vectors.dense(0.0, 0.535040431, 0.005390836), - Vectors.dense(0.0, 0.1968355, -0.2061711), - Vectors.dense(0.0, 0.307996, -0.153579), - Vectors.dense(0.0, 0.32166185, -0.09698986), - Vectors.dense(-0.88, 0.7342857, 0.1714286), - Vectors.dense(-1.9991044, 0.7247511, 0.1424392), - Vectors.dense(-0.27378146, 0.31599396, -0.06204946), - Vectors.dense(-0.17118812, 0.31200361, -0.02541656)) + Vectors.dense(0, 0.5169222, -0.3344444), + Vectors.dense(0, 0.9419107, -0.6864404), + Vectors.dense(0, 0.1812436, -0.6568422), + Vectors.dense(0, -0.2869094, 0.785771), + Vectors.dense(0, 0.1055254, 0.2979113), + Vectors.dense(-0.05990345, 0.53188982, -0.32118415), + Vectors.dense(-0.2147117, 0.991175, -0.6356096), + Vectors.dense(-1.561613, 0.664647, -0.3192581), + Vectors.dense(0.3390397, -0.3406099, 0.6870259), + Vectors.dense(0.3665034, 0.1039416, 0.1484616)) import GeneralizedLinearRegression._ var idx = 0 for (fitIntercept <- Seq(false, true)) { - for (family <- Seq("gaussian", "poisson", "gamma", "tweedie")) { + for (family <- Seq("gaussian", "binomial", "poisson", "gamma", "tweedie")) { var trainer = new GeneralizedLinearRegression().setFamily(family) .setFitIntercept(fitIntercept).setOffsetCol("offset") .setWeightCol("weight").setLinkPredictionCol("linkPrediction") @@ -877,142 +881,77 @@ class GeneralizedLinearRegressionSuite } } - test("generalized linear regression intercept only model with offset") { + test("glm summary: gaussian family with weight and offset") { /* R code: - library(statmod) - df <- as.data.frame(matrix(c( - 1.0, 1.0, 2.0, 0.0, 5.0, - 2.0, 2.0, 0.5, 1.0, 2.0, - 1.0, 3.0, 1.0, 2.0, 1.0, - 2.0, 4.0, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) - families <- list(gaussian, poisson, Gamma, tweedie(1.5)) - f1 <- V1 ~ -1 + V4 + V5 - f2 <- V1 ~ V4 + V5 - for (f in c(f1, f2)) { - for (fam in families) { - model <- glm(f, df, family = fam, weights = V2, offset = V3) - print(as.vector(coef(model))) - } - } - - [1] 0.535040431 0.005390836 - [1] 0.1968355 -0.2061711 - [1] 0.307996 -0.153579 - [1] 0.32166185 -0.09698986 - [1] -0.8800000 0.7342857 0.1714286 - [1] -1.9991044 0.7247511 0.1424392 - [1] -0.27378146 0.31599396 -0.06204946 - [1] -0.17118812 0.31200361 -0.02541656 - */ - val dataset = Seq( - OffsetInstance(1.0, 1.0, 2.0, Vectors.zeros(0)), - OffsetInstance(2.0, 2.0, 0.5, Vectors.zeros(0)), - OffsetInstance(1.0, 3.0, 1.0, Vectors.zeros(0)), - OffsetInstance(2.0, 4.0, 0.0, Vectors.zeros(0)) - ).toDF() - - val expected = Seq(1.0, -0.3559835, 0.3618836, 0.558434) - - import GeneralizedLinearRegression._ - - var idx = 0 - for (family <- Seq("gaussian", "poisson", "gamma", "tweedie")) { - var trainer = new GeneralizedLinearRegression().setFamily(family).setOffsetCol("offset") - .setWeightCol("weight").setLinkPredictionCol("linkPrediction") - if (family == "tweedie") trainer = trainer.setVariancePower(1.5) - val model = trainer.fit(dataset) - assert(model.intercept ~= expected(idx) absTol 1e-4, s"Model mismatch: " + - s"GLM with family = $family.") - - val familyLink = FamilyAndLink(trainer) - model.transform(dataset).select("features", "offset", "prediction", "linkPrediction") - .collect().foreach { - case Row(features: DenseVector, offset: Double, prediction1: Double, - linkPrediction1: Double) => - val eta = model.intercept + offset - val prediction2 = familyLink.fitted(eta) - val linkPrediction2 = eta - assert(prediction1 ~== prediction2 relTol 1E-5, "Prediction mismatch: GLM with " + - s"family = $family.") - assert(linkPrediction1 ~== linkPrediction2 relTol 1E-5, "Link Prediction mismatch: " + - s"GLM with family = $family.") - } - idx += 1 - } - } - test("glm summary: gaussian family with weight") { - /* - R code: - - A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) - b <- c(17, 19, 23, 29) - w <- c(1, 2, 3, 4) - df <- as.data.frame(cbind(A, b)) + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b <- c(17, 19, 23, 29) + w <- c(1, 2, 3, 4) + off <- c(2, 3, 1, 4) + df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = Seq( - Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(19.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(23.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(29.0, 4.0, Vectors.dense(3.0, 13.0)) + val dataset = Seq( + OffsetInstance(17.0, 1.0, 2.0, Vectors.dense(0.0, 5.0).toSparse), + OffsetInstance(19.0, 2.0, 3.0, Vectors.dense(1.0, 7.0)), + OffsetInstance(23.0, 3.0, 1.0, Vectors.dense(2.0, 11.0)), + OffsetInstance(29.0, 4.0, 4.0, Vectors.dense(3.0, 13.0)) ).toDF() /* - R code: - - model <- glm(formula = "b ~ .", family="gaussian", data = df, weights = w) - summary(model) + R code: - Deviance Residuals: - 1 2 3 4 - 1.920 -1.358 -1.109 0.960 + model <- glm(formula = "b ~ .", family = "gaussian", data = df, + weights = w, offset = off) + summary(model) - Coefficients: - Estimate Std. Error t value Pr(>|t|) - (Intercept) 18.080 9.608 1.882 0.311 - V1 6.080 5.556 1.094 0.471 - V2 -0.600 1.960 -0.306 0.811 + Deviance Residuals: + 1 2 3 4 + 0.9600 -0.6788 -0.5543 0.4800 - (Dispersion parameter for gaussian family taken to be 7.68) + Coefficients: + Estimate Std. Error t value Pr(>|t|) + (Intercept) 5.5400 4.8040 1.153 0.455 + V1 -0.9600 2.7782 -0.346 0.788 + V2 1.7000 0.9798 1.735 0.333 - Null deviance: 202.00 on 3 degrees of freedom - Residual deviance: 7.68 on 1 degrees of freedom - AIC: 18.783 + (Dispersion parameter for gaussian family taken to be 1.92) - Number of Fisher Scoring iterations: 2 + Null deviance: 152.10 on 3 degrees of freedom + Residual deviance: 1.92 on 1 degrees of freedom + AIC: 13.238 - residuals(model, type="pearson") - 1 2 3 4 - 1.920000 -1.357645 -1.108513 0.960000 + Number of Fisher Scoring iterations: 2 - residuals(model, type="working") + residuals(model, type = "pearson") + 1 2 3 4 + 0.9600000 -0.6788225 -0.5542563 0.4800000 + residuals(model, type = "working") 1 2 3 4 - 1.92 -0.96 -0.64 0.48 - - residuals(model, type="response") + 0.96 -0.48 -0.32 0.24 + residuals(model, type = "response") 1 2 3 4 - 1.92 -0.96 -0.64 0.48 + 0.96 -0.48 -0.32 0.24 */ val trainer = new GeneralizedLinearRegression() - .setWeightCol("weight") + .setWeightCol("weight").setOffsetCol("offset") - val model = trainer.fit(datasetWithWeight) - - val coefficientsR = Vectors.dense(Array(6.080, -0.600)) - val interceptR = 18.080 - val devianceResidualsR = Array(1.920, -1.358, -1.109, 0.960) - val pearsonResidualsR = Array(1.920000, -1.357645, -1.108513, 0.960000) - val workingResidualsR = Array(1.92, -0.96, -0.64, 0.48) - val responseResidualsR = Array(1.92, -0.96, -0.64, 0.48) - val seCoefR = Array(5.556, 1.960, 9.608) - val tValsR = Array(1.094, -0.306, 1.882) - val pValsR = Array(0.471, 0.811, 0.311) - val dispersionR = 7.68 - val nullDevianceR = 202.00 - val residualDevianceR = 7.68 + val model = trainer.fit(dataset) + + val coefficientsR = Vectors.dense(Array(-0.96, 1.7)) + val interceptR = 5.54 + val devianceResidualsR = Array(0.96, -0.67882, -0.55426, 0.48) + val pearsonResidualsR = Array(0.96, -0.67882, -0.55426, 0.48) + val workingResidualsR = Array(0.96, -0.48, -0.32, 0.24) + val responseResidualsR = Array(0.96, -0.48, -0.32, 0.24) + val seCoefR = Array(2.7782, 0.9798, 4.804) + val tValsR = Array(-0.34555, 1.73506, 1.15321) + val pValsR = Array(0.78819, 0.33286, 0.45478) + val dispersionR = 1.92 + val nullDevianceR = 152.1 + val residualDevianceR = 1.92 val residualDegreeOfFreedomNullR = 3 val residualDegreeOfFreedomR = 1 - val aicR = 18.783 + val aicR = 13.23758 assert(model.hasSummary) val summary = model.summary @@ -1057,7 +996,7 @@ class GeneralizedLinearRegressionSuite assert(summary.aic ~== aicR absTol 1E-3) assert(summary.solver === "irls") - val summary2: GeneralizedLinearRegressionSummary = model.evaluate(datasetWithWeight) + val summary2: GeneralizedLinearRegressionSummary = model.evaluate(dataset) assert(summary.predictions.columns.toSet === summary2.predictions.columns.toSet) assert(summary.predictionCol === summary2.predictionCol) assert(summary.rank === summary2.rank) @@ -1070,79 +1009,79 @@ class GeneralizedLinearRegressionSuite assert(summary.aic === summary2.aic) } - test("glm summary: binomial family with weight") { + test("glm summary: binomial family with weight and offset") { /* - R code: + R code: - A <- matrix(c(0, 1, 2, 3, 5, 2, 1, 3), 4, 2) - b <- c(1, 0.5, 1, 0) - w <- c(1, 2.0, 0.3, 4.7) - df <- as.data.frame(cbind(A, b)) + df <- as.data.frame(matrix(c( + 0.2, 1.0, 2.0, 0.0, 5.0, + 0.5, 2.1, 0.5, 1.0, 2.0, + 0.9, 0.4, 1.0, 2.0, 1.0, + 0.7, 0.7, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) */ - val datasetWithWeight = Seq( - Instance(1.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(0.5, 2.0, Vectors.dense(1.0, 2.0)), - Instance(1.0, 0.3, Vectors.dense(2.0, 1.0)), - Instance(0.0, 4.7, Vectors.dense(3.0, 3.0)) + val dataset = Seq( + OffsetInstance(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(0.5, 2.1, 0.5, Vectors.dense(1.0, 2.0)), + OffsetInstance(0.9, 0.4, 1.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(0.7, 0.7, 0.0, Vectors.dense(3.0, 3.0)) ).toDF() - /* - R code: - - model <- glm(formula = "b ~ . -1", family="binomial", data = df, weights = w) - summary(model) - - Deviance Residuals: - 1 2 3 4 - 0.2404 0.1965 1.2824 -0.6916 + R code: - Coefficients: - Estimate Std. Error z value Pr(>|z|) - x1 -1.6901 1.2764 -1.324 0.185 - x2 0.7059 0.9449 0.747 0.455 + model <- glm(formula = "V1 ~ V4 + V5", family = "binomial", data = df, + weights = V2, offset = V3) + summary(model) - (Dispersion parameter for binomial family taken to be 1) + Deviance Residuals: + 1 2 3 4 + 0.002584 -0.003800 0.012478 -0.001796 - Null deviance: 8.3178 on 4 degrees of freedom - Residual deviance: 2.2193 on 2 degrees of freedom - AIC: 5.9915 + Coefficients: + Estimate Std. Error z value Pr(>|z|) + (Intercept) -0.2147 3.5687 -0.060 0.952 + V4 0.9912 1.2344 0.803 0.422 + V5 -0.6356 0.9669 -0.657 0.511 - Number of Fisher Scoring iterations: 5 + (Dispersion parameter for binomial family taken to be 1) - residuals(model, type="pearson") - 1 2 3 4 - 0.171217 0.197406 2.085864 -0.495332 + Null deviance: 2.17560881 on 3 degrees of freedom + Residual deviance: 0.00018005 on 1 degrees of freedom + AIC: 10.245 - residuals(model, type="working") - 1 2 3 4 - 1.029315 0.281881 15.502768 -1.052203 + Number of Fisher Scoring iterations: 4 - residuals(model, type="response") - 1 2 3 4 - 0.028480 0.069123 0.935495 -0.049613 + residuals(model, type = "pearson") + 1 2 3 4 + 0.002586113 -0.003799744 0.012372235 -0.001796892 + residuals(model, type = "working") + 1 2 3 4 + 0.006477857 -0.005244163 0.063541250 -0.004691064 + residuals(model, type = "response") + 1 2 3 4 + 0.0010324375 -0.0013110318 0.0060225522 -0.0009832738 */ val trainer = new GeneralizedLinearRegression() .setFamily("Binomial") .setWeightCol("weight") - .setFitIntercept(false) - - val model = trainer.fit(datasetWithWeight) - - val coefficientsR = Vectors.dense(Array(-1.690134, 0.705929)) - val interceptR = 0.0 - val devianceResidualsR = Array(0.2404, 0.1965, 1.2824, -0.6916) - val pearsonResidualsR = Array(0.171217, 0.197406, 2.085864, -0.495332) - val workingResidualsR = Array(1.029315, 0.281881, 15.502768, -1.052203) - val responseResidualsR = Array(0.02848, 0.069123, 0.935495, -0.049613) - val seCoefR = Array(1.276417, 0.944934) - val tValsR = Array(-1.324124, 0.747068) - val pValsR = Array(0.185462, 0.455023) - val dispersionR = 1.0 - val nullDevianceR = 8.3178 - val residualDevianceR = 2.2193 - val residualDegreeOfFreedomNullR = 4 - val residualDegreeOfFreedomR = 2 - val aicR = 5.991537 + .setOffsetCol("offset") + + val model = trainer.fit(dataset) + + val coefficientsR = Vectors.dense(Array(0.99117, -0.63561)) + val interceptR = -0.21471 + val devianceResidualsR = Array(0.00258, -0.0038, 0.01248, -0.0018) + val pearsonResidualsR = Array(0.00259, -0.0038, 0.01237, -0.0018) + val workingResidualsR = Array(0.00648, -0.00524, 0.06354, -0.00469) + val responseResidualsR = Array(0.00103, -0.00131, 0.00602, -0.00098) + val seCoefR = Array(1.23439, 0.9669, 3.56866) + val tValsR = Array(0.80297, -0.65737, -0.06017) + val pValsR = Array(0.42199, 0.51094, 0.95202) + val dispersionR = 1 + val nullDevianceR = 2.17561 + val residualDevianceR = 0.00018 + val residualDegreeOfFreedomNullR = 3 + val residualDegreeOfFreedomR = 1 + val aicR = 10.24453 val summary = model.summary val devianceResiduals = summary.residuals() @@ -1185,81 +1124,79 @@ class GeneralizedLinearRegressionSuite assert(summary.solver === "irls") } - test("glm summary: poisson family with weight") { + test("glm summary: poisson family with weight and offset") { /* - R code: + R code: - A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) - b <- c(2, 8, 3, 9) - w <- c(1, 2, 3, 4) - df <- as.data.frame(cbind(A, b)) + A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) + b <- c(2, 8, 3, 9) + w <- c(1, 2, 3, 4) + off <- c(2, 3, 1, 4) + df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = Seq( - Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) + val dataset = Seq( + OffsetInstance(2.0, 1.0, 2.0, Vectors.dense(0.0, 5.0).toSparse), + OffsetInstance(8.0, 2.0, 3.0, Vectors.dense(1.0, 7.0)), + OffsetInstance(3.0, 3.0, 1.0, Vectors.dense(2.0, 11.0)), + OffsetInstance(9.0, 4.0, 4.0, Vectors.dense(3.0, 13.0)) ).toDF() /* - R code: - - model <- glm(formula = "b ~ .", family="poisson", data = df, weights = w) - summary(model) - - Deviance Residuals: - 1 2 3 4 - -0.28952 0.11048 0.14839 -0.07268 - - Coefficients: - Estimate Std. Error z value Pr(>|z|) - (Intercept) 6.2999 1.6086 3.916 8.99e-05 *** - V1 3.3241 1.0184 3.264 0.00110 ** - V2 -1.0818 0.3522 -3.071 0.00213 ** - --- - Signif. codes: 0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1 - - (Dispersion parameter for poisson family taken to be 1) - - Null deviance: 15.38066 on 3 degrees of freedom - Residual deviance: 0.12333 on 1 degrees of freedom - AIC: 41.803 - - Number of Fisher Scoring iterations: 3 + R code: - residuals(model, type="pearson") - 1 2 3 4 - -0.28043145 0.11099310 0.14963714 -0.07253611 + model <- glm(formula = "b ~ .", family = "poisson", data = df, + weights = w, offset = off) + summary(model) - residuals(model, type="working") - 1 2 3 4 - -0.17960679 0.02813593 0.05113852 -0.01201650 + Deviance Residuals: + 1 2 3 4 + -2.0480 1.2315 1.8293 -0.7107 - residuals(model, type="response") - 1 2 3 4 - -0.4378554 0.2189277 0.1459518 -0.1094638 + Coefficients: + Estimate Std. Error z value Pr(>|z|) + (Intercept) -4.5678 1.9625 -2.328 0.0199 + V1 -2.8784 1.1683 -2.464 0.0137 + V2 0.8859 0.4170 2.124 0.0336 + + (Dispersion parameter for poisson family taken to be 1) + + Null deviance: 22.5585 on 3 degrees of freedom + Residual deviance: 9.5622 on 1 degrees of freedom + AIC: 51.242 + + Number of Fisher Scoring iterations: 5 + + residuals(model, type = "pearson") + 1 2 3 4 + -1.7480418 1.3037611 2.0750099 -0.6972966 + residuals(model, type = "working") + 1 2 3 4 + -0.6891489 0.3833588 0.9710682 -0.1096590 + residuals(model, type = "response") + 1 2 3 4 + -4.433948 2.216974 1.477983 -1.108487 */ val trainer = new GeneralizedLinearRegression() .setFamily("Poisson") .setWeightCol("weight") - .setFitIntercept(true) - - val model = trainer.fit(datasetWithWeight) - - val coefficientsR = Vectors.dense(Array(3.3241, -1.0818)) - val interceptR = 6.2999 - val devianceResidualsR = Array(-0.28952, 0.11048, 0.14839, -0.07268) - val pearsonResidualsR = Array(-0.28043145, 0.11099310, 0.14963714, -0.07253611) - val workingResidualsR = Array(-0.17960679, 0.02813593, 0.05113852, -0.01201650) - val responseResidualsR = Array(-0.4378554, 0.2189277, 0.1459518, -0.1094638) - val seCoefR = Array(1.0184, 0.3522, 1.6086) - val tValsR = Array(3.264, -3.071, 3.916) - val pValsR = Array(0.00110, 0.00213, 0.00009) - val dispersionR = 1.0 - val nullDevianceR = 15.38066 - val residualDevianceR = 0.12333 + .setOffsetCol("offset") + + val model = trainer.fit(dataset) + + val coefficientsR = Vectors.dense(Array(-2.87843, 0.88589)) + val interceptR = -4.56784 + val devianceResidualsR = Array(-2.04796, 1.23149, 1.82933, -0.71066) + val pearsonResidualsR = Array(-1.74804, 1.30376, 2.07501, -0.6973) + val workingResidualsR = Array(-0.68915, 0.38336, 0.97107, -0.10966) + val responseResidualsR = Array(-4.43395, 2.21697, 1.47798, -1.10849) + val seCoefR = Array(1.16826, 0.41703, 1.96249) + val tValsR = Array(-2.46387, 2.12428, -2.32757) + val pValsR = Array(0.01374, 0.03365, 0.01993) + val dispersionR = 1 + val nullDevianceR = 22.55853 + val residualDevianceR = 9.5622 val residualDegreeOfFreedomNullR = 3 val residualDegreeOfFreedomR = 1 - val aicR = 41.803 + val aicR = 51.24218 val summary = model.summary val devianceResiduals = summary.residuals() @@ -1302,78 +1239,79 @@ class GeneralizedLinearRegressionSuite assert(summary.solver === "irls") } - test("glm summary: gamma family with weight") { + test("glm summary: gamma family with weight and offset") { /* - R code: + R code: - A <- matrix(c(0, 1, 2, 3, 5, 7, 11, 13), 4, 2) - b <- c(2, 8, 3, 9) - w <- c(1, 2, 3, 4) - df <- as.data.frame(cbind(A, b)) + A <- matrix(c(0, 5, 1, 2, 2, 1, 3, 3), 4, 2, byrow = TRUE) + b <- c(1, 2, 1, 2) + w <- c(1, 2, 3, 4) + off <- c(0, 0.5, 1, 0) + df <- as.data.frame(cbind(A, b)) */ - val datasetWithWeight = Seq( - Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) + val dataset = Seq( + OffsetInstance(1.0, 1.0, 0.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(2.0, 2.0, 0.5, Vectors.dense(1.0, 2.0)), + OffsetInstance(1.0, 3.0, 1.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(2.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) ).toDF() /* - R code: - - model <- glm(formula = "b ~ .", family="Gamma", data = df, weights = w) - summary(model) + R code: - Deviance Residuals: - 1 2 3 4 - -0.26343 0.05761 0.12818 -0.03484 + model <- glm(formula = "b ~ .", family = "Gamma", data = df, + weights = w, offset = off) + summary(model) - Coefficients: - Estimate Std. Error t value Pr(>|t|) - (Intercept) -0.81511 0.23449 -3.476 0.178 - V1 -0.72730 0.16137 -4.507 0.139 - V2 0.23894 0.05481 4.359 0.144 + Deviance Residuals: + 1 2 3 4 + -0.17095 0.19867 -0.23604 0.03241 - (Dispersion parameter for Gamma family taken to be 0.07986091) + Coefficients: + Estimate Std. Error t value Pr(>|t|) + (Intercept) -0.56474 0.23866 -2.366 0.255 + V1 0.07695 0.06931 1.110 0.467 + V2 0.28068 0.07320 3.835 0.162 - Null deviance: 2.937462 on 3 degrees of freedom - Residual deviance: 0.090358 on 1 degrees of freedom - AIC: 23.202 + (Dispersion parameter for Gamma family taken to be 0.1212174) - Number of Fisher Scoring iterations: 4 + Null deviance: 2.02568 on 3 degrees of freedom + Residual deviance: 0.12546 on 1 degrees of freedom + AIC: 0.93388 - residuals(model, type="pearson") - 1 2 3 4 - -0.24082508 0.05839241 0.13135766 -0.03463621 + Number of Fisher Scoring iterations: 4 - residuals(model, type="working") + residuals(model, type = "pearson") + 1 2 3 4 + -0.16134949 0.20807694 -0.22544551 0.03258777 + residuals(model, type = "working") 1 2 3 4 - 0.091414181 -0.005374314 -0.027196998 0.001890910 - - residuals(model, type="response") - 1 2 3 4 - -0.6344390 0.3172195 0.2114797 -0.1586097 + 0.135315831 -0.084390309 0.113219135 -0.008279688 + residuals(model, type = "response") + 1 2 3 4 + -0.1923918 0.2565224 -0.1496381 0.0320653 */ val trainer = new GeneralizedLinearRegression() .setFamily("Gamma") .setWeightCol("weight") + .setOffsetCol("offset") + + val model = trainer.fit(dataset) - val model = trainer.fit(datasetWithWeight) - - val coefficientsR = Vectors.dense(Array(-0.72730, 0.23894)) - val interceptR = -0.81511 - val devianceResidualsR = Array(-0.26343, 0.05761, 0.12818, -0.03484) - val pearsonResidualsR = Array(-0.24082508, 0.05839241, 0.13135766, -0.03463621) - val workingResidualsR = Array(0.091414181, -0.005374314, -0.027196998, 0.001890910) - val responseResidualsR = Array(-0.6344390, 0.3172195, 0.2114797, -0.1586097) - val seCoefR = Array(0.16137, 0.05481, 0.23449) - val tValsR = Array(-4.507, 4.359, -3.476) - val pValsR = Array(0.139, 0.144, 0.178) - val dispersionR = 0.07986091 - val nullDevianceR = 2.937462 - val residualDevianceR = 0.090358 + val coefficientsR = Vectors.dense(Array(0.07695, 0.28068)) + val interceptR = -0.56474 + val devianceResidualsR = Array(-0.17095, 0.19867, -0.23604, 0.03241) + val pearsonResidualsR = Array(-0.16135, 0.20808, -0.22545, 0.03259) + val workingResidualsR = Array(0.13532, -0.08439, 0.11322, -0.00828) + val responseResidualsR = Array(-0.19239, 0.25652, -0.14964, 0.03207) + val seCoefR = Array(0.06931, 0.0732, 0.23866) + val tValsR = Array(1.11031, 3.83453, -2.3663) + val pValsR = Array(0.46675, 0.16241, 0.25454) + val dispersionR = 0.12122 + val nullDevianceR = 2.02568 + val residualDevianceR = 0.12546 val residualDegreeOfFreedomNullR = 3 val residualDegreeOfFreedomR = 1 - val aicR = 23.202 + val aicR = 0.93388 val summary = model.summary val devianceResiduals = summary.residuals() @@ -1416,77 +1354,81 @@ class GeneralizedLinearRegressionSuite assert(summary.solver === "irls") } - test("glm summary: tweedie family with weight") { + test("glm summary: tweedie family with weight and offset") { /* R code: - library(statmod) df <- as.data.frame(matrix(c( - 1.0, 1.0, 0.0, 5.0, - 0.5, 2.0, 1.0, 2.0, - 1.0, 3.0, 2.0, 1.0, - 0.0, 4.0, 3.0, 3.0), 4, 4, byrow = TRUE)) + 1.0, 1.0, 1.0, 0.0, 5.0, + 0.5, 2.0, 3.0, 1.0, 2.0, + 1.0, 3.0, 2.0, 2.0, 1.0, + 0.0, 4.0, 0.0, 3.0, 3.0), 4, 5, byrow = TRUE)) + */ + val dataset = Seq( + OffsetInstance(1.0, 1.0, 1.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(0.5, 2.0, 3.0, Vectors.dense(1.0, 2.0)), + OffsetInstance(1.0, 3.0, 2.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(0.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + ).toDF() + /* + R code: - model <- glm(V1 ~ -1 + V3 + V4, data = df, weights = V2, - family = tweedie(var.power = 1.6, link.power = 0)) + library(statmod) + model <- glm(V1 ~ V4 + V5, data = df, weights = V2, offset = V3, + family = tweedie(var.power = 1.6, link.power = 0.0)) summary(model) Deviance Residuals: 1 2 3 4 - 0.6210 -0.0515 1.6935 -3.2539 + 0.8917 -2.1396 1.2252 -1.7946 Coefficients: - Estimate Std. Error t value Pr(>|t|) - V3 -0.4087 0.5205 -0.785 0.515 - V4 -0.1212 0.4082 -0.297 0.794 + Estimate Std. Error t value Pr(>|t|) + (Intercept) -0.03047 3.65000 -0.008 0.995 + V4 -1.14577 1.41674 -0.809 0.567 + V5 -0.36585 0.97065 -0.377 0.771 - (Dispersion parameter for Tweedie family taken to be 3.830036) + (Dispersion parameter for Tweedie family taken to be 6.334961) - Null deviance: 20.702 on 4 degrees of freedom - Residual deviance: 13.844 on 2 degrees of freedom + Null deviance: 12.784 on 3 degrees of freedom + Residual deviance: 10.095 on 1 degrees of freedom AIC: NA - Number of Fisher Scoring iterations: 11 - - residuals(model, type="pearson") - 1 2 3 4 - 0.7383616 -0.0509458 2.2348337 -1.4552090 - residuals(model, type="working") - 1 2 3 4 - 0.83354150 -0.04103552 1.55676369 -1.00000000 - residuals(model, type="response") - 1 2 3 4 - 0.45460738 -0.02139574 0.60888055 -0.20392801 + Number of Fisher Scoring iterations: 18 + + residuals(model, type = "pearson") + 1 2 3 4 + 1.1472554 -1.4642569 1.4935199 -0.8025842 + residuals(model, type = "working") + 1 2 3 4 + 1.3624928 -0.8322375 0.9894580 -1.0000000 + residuals(model, type = "response") + 1 2 3 4 + 0.57671828 -2.48040354 0.49735052 -0.01040646 */ - val datasetWithWeight = Seq( - Instance(1.0, 1.0, Vectors.dense(0.0, 5.0)), - Instance(0.5, 2.0, Vectors.dense(1.0, 2.0)), - Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)), - Instance(0.0, 4.0, Vectors.dense(3.0, 3.0)) - ).toDF() - val trainer = new GeneralizedLinearRegression() .setFamily("tweedie") .setVariancePower(1.6) .setLinkPower(0.0) .setWeightCol("weight") - .setFitIntercept(false) - - val model = trainer.fit(datasetWithWeight) - val coefficientsR = Vectors.dense(Array(-0.408746, -0.12125)) - val interceptR = 0.0 - val devianceResidualsR = Array(0.621047, -0.051515, 1.693473, -3.253946) - val pearsonResidualsR = Array(0.738362, -0.050946, 2.234834, -1.455209) - val workingResidualsR = Array(0.833541, -0.041036, 1.556764, -1.0) - val responseResidualsR = Array(0.454607, -0.021396, 0.608881, -0.203928) - val seCoefR = Array(0.520519, 0.408215) - val tValsR = Array(-0.785267, -0.297024) - val pValsR = Array(0.514549, 0.794457) - val dispersionR = 3.830036 - val nullDevianceR = 20.702 - val residualDevianceR = 13.844 - val residualDegreeOfFreedomNullR = 4 - val residualDegreeOfFreedomR = 2 + .setOffsetCol("offset") + + val model = trainer.fit(dataset) + + val coefficientsR = Vectors.dense(Array(-1.14577, -0.36585)) + val interceptR = -0.03047 + val devianceResidualsR = Array(0.89171, -2.13961, 1.2252, -1.79463) + val pearsonResidualsR = Array(1.14726, -1.46426, 1.49352, -0.80258) + val workingResidualsR = Array(1.36249, -0.83224, 0.98946, -1) + val responseResidualsR = Array(0.57672, -2.4804, 0.49735, -0.01041) + val seCoefR = Array(1.41674, 0.97065, 3.65) + val tValsR = Array(-0.80873, -0.37691, -0.00835) + val pValsR = Array(0.56707, 0.77053, 0.99468) + val dispersionR = 6.33496 + val nullDevianceR = 12.78358 + val residualDevianceR = 10.09488 + val residualDegreeOfFreedomNullR = 3 + val residualDegreeOfFreedomR = 1 val summary = model.summary From 90d68a67815aceae63eaad7345477a082bb2febd Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Mon, 13 Feb 2017 23:41:37 -0800 Subject: [PATCH 19/22] allow missing offset in prediction --- .../apache/spark/ml/feature/Instance.scala | 7 ++-- .../GeneralizedLinearRegression.scala | 32 +++++++++++++------ .../GeneralizedLinearRegressionSuite.scala | 29 +++++++++++++++-- 3 files changed, 54 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index ba9224663cf4..4bdb6a0d14a2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -37,8 +37,11 @@ private[ml] case class Instance(label: Double, weight: Double, features: Vector) * @param offset The offset used for this data point. * @param features The vector of features for this data point. */ -private[ml] case class OffsetInstance(label: Double, weight: Double, offset: Double, - features: Vector) { +private[ml] case class OffsetInstance( + label: Double, + weight: Double, + offset: Double, + features: Vector) { /** Constructs from an [[Instance]] object and offset */ def this(instance: Instance, offset: Double = 0.0) = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index b777544f5bfe..b486af643c7c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -179,7 +179,9 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam } val newSchema = super.validateAndTransformSchema(schema, fitting, featuresDataType) - if (isSetOffsetCol(this)) SchemaUtils.checkNumericType(schema, $(offsetCol)) + if (fitting) { + if (isSetOffsetCol(this)) SchemaUtils.checkNumericType(schema, $(offsetCol)) + } if (hasLinkPredictionCol) { SchemaUtils.appendColumn(newSchema, $(linkPredictionCol), DoubleType) } else { @@ -437,11 +439,11 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine /** Checks whether weight column is set and nonempty */ private[regression] def isSetWeightCol(params: GeneralizedLinearRegressionBase): Boolean = - params.isSet(params.weightCol) && !params.getWeightCol.isEmpty + params.isSet(params.weightCol) && params.getWeightCol.nonEmpty /** Checks whether offset column is set and nonempty */ private[regression] def isSetOffsetCol(params: GeneralizedLinearRegressionBase): Boolean = - params.isSet(params.offsetCol) && !params.getOffsetCol.isEmpty + params.isSet(params.offsetCol) && params.getOffsetCol.nonEmpty /** * Wrapper of family and link combination used in the model. @@ -981,12 +983,7 @@ class GeneralizedLinearRegressionModel private[ml] ( private lazy val familyAndLink = FamilyAndLink(this) override protected def predict(features: Vector): Double = { - if (!isSetOffsetCol(this)) { - val eta = BLAS.dot(features, coefficients) + intercept - familyAndLink.fitted(eta) - } else { - throw new SparkException("Must supply offset to predict when offset column is set.") - } + predict(features, 0.0) } /** @@ -1012,7 +1009,22 @@ class GeneralizedLinearRegressionModel private[ml] ( override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val predictUDF = udf { (features: Vector, offset: Double) => predict(features, offset) } val predictLinkUDF = udf { (features: Vector, offset: Double) => predictLink(features, offset) } - val offset = if (!isSetOffsetCol(this)) lit(0.0) else col($(offsetCol)).cast(DoubleType) + /* + Offset is only validated when it's specified in the model and available in prediction data set. + When offset is specified but missing in the prediction data set, we default it to zero. + */ + val offset = { + if (!isSetOffsetCol(this)) { + lit(0.0) + } else { + if (dataset.schema.fieldNames.contains($(offsetCol))) { + SchemaUtils.checkNumericType(dataset.schema, $(offsetCol)) + col($(offsetCol)).cast(DoubleType) + } else { + lit(0.0) + } + } + } var output = dataset if ($(predictionCol).nonEmpty) { output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), offset)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index f084d5325af0..bb8b90da8514 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -853,10 +853,10 @@ class GeneralizedLinearRegressionSuite for (fitIntercept <- Seq(false, true)) { for (family <- Seq("gaussian", "binomial", "poisson", "gamma", "tweedie")) { - var trainer = new GeneralizedLinearRegression().setFamily(family) + val trainer = new GeneralizedLinearRegression().setFamily(family) .setFitIntercept(fitIntercept).setOffsetCol("offset") .setWeightCol("weight").setLinkPredictionCol("linkPrediction") - if (family == "tweedie") trainer = trainer.setVariancePower(1.5) + if (family == "tweedie") trainer.setVariancePower(1.5) val model = trainer.fit(dataset) val actual = Vectors.dense(model.intercept, model.coefficients(0), model.coefficients(1)) assert(actual ~= expected(idx) absTol 1e-4, s"Model mismatch: GLM with family = $family," + @@ -881,6 +881,31 @@ class GeneralizedLinearRegressionSuite } } + test("generalized linear regression: predict with no offset") { + val trainData = Seq( + OffsetInstance(2.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), + OffsetInstance(8.0, 2.0, 3.0, Vectors.dense(1.0, 7.0)), + OffsetInstance(3.0, 3.0, 1.0, Vectors.dense(2.0, 11.0)), + OffsetInstance(9.0, 4.0, 4.0, Vectors.dense(3.0, 13.0)) + ).toDF() + val testData = trainData.select("weight", "features") + + val trainer = new GeneralizedLinearRegression() + .setFamily("poisson") + .setWeightCol("weight") + .setOffsetCol("offset") + .setLinkPredictionCol("linkPrediction") + + val model = trainer.fit(trainData) + model.transform(testData).show() + model.transform(testData).select("features", "linkPrediction") + .collect().foreach { + case Row(features: DenseVector, linkPrediction1: Double) => + val linkPrediction2 = BLAS.dot(features, model.coefficients) + model.intercept + assert(linkPrediction1 ~= linkPrediction2 relTol 1E-5, "Link Prediction mismatch") + } + } + test("glm summary: gaussian family with weight and offset") { /* R code: From e95c25b73682669b65f194141ae08c56deb4d90c Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Mon, 13 Feb 2017 23:42:38 -0800 Subject: [PATCH 20/22] clean up --- .../spark/ml/regression/GeneralizedLinearRegressionSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index bb8b90da8514..9b208c204b5f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -897,7 +897,6 @@ class GeneralizedLinearRegressionSuite .setLinkPredictionCol("linkPrediction") val model = trainer.fit(trainData) - model.transform(testData).show() model.transform(testData).select("features", "linkPrediction") .collect().foreach { case Row(features: DenseVector, linkPrediction1: Double) => From 1e47a11f6a3e1292426ea25e7728b805c6650e1b Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Tue, 27 Jun 2017 14:47:59 -0700 Subject: [PATCH 21/22] address comments --- .../apache/spark/ml/feature/Instance.scala | 1 + .../spark/ml/optim/WeightedLeastSquares.scala | 2 +- .../GeneralizedLinearRegression.scala | 67 ++++++++----------- ...erativelyReweightedLeastSquaresSuite.scala | 20 +++--- .../GeneralizedLinearRegressionSuite.scala | 28 +------- 5 files changed, 41 insertions(+), 77 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 4bdb6a0d14a2..51389e9da173 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -31,6 +31,7 @@ private[ml] case class Instance(label: Double, weight: Double, features: Vector) /** * Case class that represents an instance of data point with * label, weight, offset and features. + * This is mainly used in GeneralizedLinearRegression currently. * * @param label Label for this data point. * @param weight The weight of this instance. diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala index 56ab9675700a..32b0af72ba9b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.optim import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg._ import org.apache.spark.rdd.RDD diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index b486af643c7c..f145ae3f6a3b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -135,8 +135,8 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam def getLinkPredictionCol: String = $(linkPredictionCol) /** - * Param for offset column name. If this is not set or empty, we treat all - * instance offsets as 0.0. + * Param for offset column name. If this is not set or empty, we treat all instance offsets + * as 0.0. The feature specified as offset has a constant coefficient of 1.0. * @group param */ final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "The offset " + @@ -145,6 +145,14 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam /** @group getParam */ def getOffsetCol: String = $(offsetCol) + /** Checks whether weight column is set and nonempty. */ + private[regression] def hasWeightCol: Boolean = + isSet(weightCol) && $(weightCol).nonEmpty + + /** Checks whether offset column is set and nonempty. */ + private[regression] def hasOffsetCol: Boolean = + isSet(offsetCol) && $(offsetCol).nonEmpty + /** Checks whether we should output link prediction. */ private[regression] def hasLinkPredictionCol: Boolean = { isDefined(linkPredictionCol) && $(linkPredictionCol).nonEmpty @@ -179,9 +187,11 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam } val newSchema = super.validateAndTransformSchema(schema, fitting, featuresDataType) - if (fitting) { - if (isSetOffsetCol(this)) SchemaUtils.checkNumericType(schema, $(offsetCol)) + + if (hasOffsetCol) { + SchemaUtils.checkNumericType(schema, $(offsetCol)) } + if (hasLinkPredictionCol) { SchemaUtils.appendColumn(newSchema, $(linkPredictionCol), DoubleType) } else { @@ -318,7 +328,6 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val /** * Sets the value of param [[offsetCol]]. - * The feature specified as offset has a constant coefficient of 1.0. * If this is not set or empty, we treat all instance offsets as 0.0. * Default is not set, so all instances have offset 0.0. * @@ -364,8 +373,8 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val "GeneralizedLinearRegression was given data with 0 features, and with Param fitIntercept " + "set to false. To fit a model with 0 features, fitIntercept must be set to true." ) - val w = if (!isSetWeightCol(this)) lit(1.0) else col($(weightCol)) - val offset = if (!isSetOffsetCol(this)) lit(0.0) else col($(offsetCol)).cast(DoubleType) + val w = if (!hasWeightCol) lit(1.0) else col($(weightCol)) + val offset = if (!hasOffsetCol) lit(0.0) else col($(offsetCol)).cast(DoubleType) val model = if (familyAndLink.family == Gaussian && familyAndLink.link == Identity) { // TODO: Make standardizeFeatures and standardizeLabel configurable. @@ -437,14 +446,6 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine private[regression] val epsilon: Double = 1E-16 - /** Checks whether weight column is set and nonempty */ - private[regression] def isSetWeightCol(params: GeneralizedLinearRegressionBase): Boolean = - params.isSet(params.weightCol) && params.getWeightCol.nonEmpty - - /** Checks whether offset column is set and nonempty */ - private[regression] def isSetOffsetCol(params: GeneralizedLinearRegressionBase): Boolean = - params.isSet(params.offsetCol) && params.getOffsetCol.nonEmpty - /** * Wrapper of family and link combination used in the model. */ @@ -476,14 +477,14 @@ object GeneralizedLinearRegression extends DefaultParamsReadable[GeneralizedLine } /** - * The reweight function used to update offsets and weights + * The reweight function used to update working labels and weights * at each iteration of [[IterativelyReweightedLeastSquares]]. */ val reweightFunc: (OffsetInstance, WeightedLeastSquaresModel) => (Double, Double) = { (instance: OffsetInstance, model: WeightedLeastSquaresModel) => { - val eta = model.predict(instance.features) - val mu = fitted(eta + instance.offset) - val newLabel = eta + (instance.label - mu) * link.deriv(mu) + val eta = model.predict(instance.features) + instance.offset + val mu = fitted(eta) + val newLabel = eta - instance.offset + (instance.label - mu) * link.deriv(mu) val newWeight = instance.weight / (math.pow(this.link.deriv(mu), 2.0) * family.variance(mu)) (newLabel, newWeight) } @@ -989,7 +990,7 @@ class GeneralizedLinearRegressionModel private[ml] ( /** * Calculates the predicted value when offset is set. */ - protected def predict(features: Vector, offset: Double): Double = { + def predict(features: Vector, offset: Double): Double = { val eta = predictLink(features, offset) familyAndLink.fitted(eta) } @@ -1009,22 +1010,8 @@ class GeneralizedLinearRegressionModel private[ml] ( override protected def transformImpl(dataset: Dataset[_]): DataFrame = { val predictUDF = udf { (features: Vector, offset: Double) => predict(features, offset) } val predictLinkUDF = udf { (features: Vector, offset: Double) => predictLink(features, offset) } - /* - Offset is only validated when it's specified in the model and available in prediction data set. - When offset is specified but missing in the prediction data set, we default it to zero. - */ - val offset = { - if (!isSetOffsetCol(this)) { - lit(0.0) - } else { - if (dataset.schema.fieldNames.contains($(offsetCol))) { - SchemaUtils.checkNumericType(dataset.schema, $(offsetCol)) - col($(offsetCol)).cast(DoubleType) - } else { - lit(0.0) - } - } - } + + val offset = if (!hasOffsetCol) lit(0.0) else col($(offsetCol)).cast(DoubleType) var output = dataset if ($(predictionCol).nonEmpty) { output = output.withColumn($(predictionCol), predictUDF(col($(featuresCol)), offset)) @@ -1218,11 +1205,11 @@ class GeneralizedLinearRegressionSummary private[regression] ( private def prediction: Column = col(predictionCol) private def weight: Column = { - if (!isSetWeightCol(model)) lit(1.0) else col(model.getWeightCol) + if (!model.hasWeightCol) lit(1.0) else col(model.getWeightCol) } private def offset: Column = { - if (!isSetOffsetCol(model)) lit(0.0) else col(model.getOffsetCol).cast(DoubleType) + if (!model.hasOffsetCol) lit(0.0) else col(model.getOffsetCol).cast(DoubleType) } private[regression] lazy val devianceResiduals: DataFrame = { @@ -1285,8 +1272,8 @@ class GeneralizedLinearRegressionSummary private[regression] ( Estimate intercept analytically when there is no offset, or when there is offset but the model is Gaussian family with identity link. Otherwise, fit an intercept only model. */ - if (!isSetOffsetCol(model) || - (isSetOffsetCol(model) && family == Gaussian && link == Identity)) { + if (!model.hasOffsetCol || + (model.hasOffsetCol && family == Gaussian && link == Identity)) { val agg = predictions.agg(sum(weight.multiply( label.minus(offset))), sum(weight)).first() link.link(agg.getDouble(0) / agg.getDouble(1)) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala index cdce453f411a..6d143504fcf5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquaresSuite.scala @@ -39,11 +39,11 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes w <- c(1, 2, 3, 4) */ instances1 = sc.parallelize(Seq( - Instance(1.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(0.0, 2.0, Vectors.dense(1.0, 2.0)), - Instance(1.0, 3.0, Vectors.dense(2.0, 1.0)), - Instance(0.0, 4.0, Vectors.dense(3.0, 3.0)) - ), 2).map(new OffsetInstance(_)) + OffsetInstance(1.0, 1.0, 0.0, Vectors.dense(0.0, 5.0).toSparse), + OffsetInstance(0.0, 2.0, 0.0, Vectors.dense(1.0, 2.0)), + OffsetInstance(1.0, 3.0, 0.0, Vectors.dense(2.0, 1.0)), + OffsetInstance(0.0, 4.0, 0.0, Vectors.dense(3.0, 3.0)) + ), 2) /* R code: @@ -52,11 +52,11 @@ class IterativelyReweightedLeastSquaresSuite extends SparkFunSuite with MLlibTes w <- c(1, 2, 3, 4) */ instances2 = sc.parallelize(Seq( - Instance(2.0, 1.0, Vectors.dense(0.0, 5.0).toSparse), - Instance(8.0, 2.0, Vectors.dense(1.0, 7.0)), - Instance(3.0, 3.0, Vectors.dense(2.0, 11.0)), - Instance(9.0, 4.0, Vectors.dense(3.0, 13.0)) - ), 2).map(new OffsetInstance(_)) + OffsetInstance(2.0, 1.0, 0.0, Vectors.dense(0.0, 5.0).toSparse), + OffsetInstance(8.0, 2.0, 0.0, Vectors.dense(1.0, 7.0)), + OffsetInstance(3.0, 3.0, 0.0, Vectors.dense(2.0, 11.0)), + OffsetInstance(9.0, 4.0, 0.0, Vectors.dense(3.0, 13.0)) + ), 2) } test("IRLS against GLM with Binomial errors") { diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala index 9b208c204b5f..f6e85c52c25a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GeneralizedLinearRegressionSuite.scala @@ -798,7 +798,7 @@ class GeneralizedLinearRegressionSuite } } - test("generalized linear regression with offset") { + test("generalized linear regression with weight and offset") { /* R code: library(statmod) @@ -881,30 +881,6 @@ class GeneralizedLinearRegressionSuite } } - test("generalized linear regression: predict with no offset") { - val trainData = Seq( - OffsetInstance(2.0, 1.0, 2.0, Vectors.dense(0.0, 5.0)), - OffsetInstance(8.0, 2.0, 3.0, Vectors.dense(1.0, 7.0)), - OffsetInstance(3.0, 3.0, 1.0, Vectors.dense(2.0, 11.0)), - OffsetInstance(9.0, 4.0, 4.0, Vectors.dense(3.0, 13.0)) - ).toDF() - val testData = trainData.select("weight", "features") - - val trainer = new GeneralizedLinearRegression() - .setFamily("poisson") - .setWeightCol("weight") - .setOffsetCol("offset") - .setLinkPredictionCol("linkPrediction") - - val model = trainer.fit(trainData) - model.transform(testData).select("features", "linkPrediction") - .collect().foreach { - case Row(features: DenseVector, linkPrediction1: Double) => - val linkPrediction2 = BLAS.dot(features, model.coefficients) + model.intercept - assert(linkPrediction1 ~= linkPrediction2 relTol 1E-5, "Link Prediction mismatch") - } - } - test("glm summary: gaussian family with weight and offset") { /* R code: @@ -1309,7 +1285,7 @@ class GeneralizedLinearRegressionSuite -0.16134949 0.20807694 -0.22544551 0.03258777 residuals(model, type = "working") 1 2 3 4 - 0.135315831 -0.084390309 0.113219135 -0.008279688 + 0.135315831 -0.084390309 0.113219135 -0.008279688 residuals(model, type = "response") 1 2 3 4 -0.1923918 0.2565224 -0.1496381 0.0320653 From db0ac937b9df678f7f58b26bab1afa43d77eb5a1 Mon Sep 17 00:00:00 2001 From: actuaryzhang Date: Thu, 29 Jun 2017 10:50:15 -0700 Subject: [PATCH 22/22] address comments --- .../main/scala/org/apache/spark/ml/feature/Instance.scala | 7 +------ .../spark/ml/regression/GeneralizedLinearRegression.scala | 6 ++++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index 51389e9da173..dd56fbbfa2b6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -44,12 +44,7 @@ private[ml] case class OffsetInstance( offset: Double, features: Vector) { - /** Constructs from an [[Instance]] object and offset */ - def this(instance: Instance, offset: Double = 0.0) = { - this(instance.label, instance.weight, offset, instance.features) - } - /** Converts to an [[Instance]] object by leaving out the offset. */ - private[ml] def toInstance: Instance = Instance(label, weight, features) + def toInstance: Instance = Instance(label, weight, features) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index f145ae3f6a3b..1d942f2f2016 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -139,10 +139,12 @@ private[regression] trait GeneralizedLinearRegressionBase extends PredictorParam * as 0.0. The feature specified as offset has a constant coefficient of 1.0. * @group param */ + @Since("2.3.0") final val offsetCol: Param[String] = new Param[String](this, "offsetCol", "The offset " + "column name. If this is not set or empty, we treat all instance offsets as 0.0") /** @group getParam */ + @Since("2.3.0") def getOffsetCol: String = $(offsetCol) /** Checks whether weight column is set and nonempty. */ @@ -333,7 +335,7 @@ class GeneralizedLinearRegression @Since("2.0.0") (@Since("2.0.0") override val * * @group setParam */ - @Since("2.2.0") + @Since("2.3.0") def setOffsetCol(value: String): this.type = set(offsetCol, value) /** @@ -990,7 +992,7 @@ class GeneralizedLinearRegressionModel private[ml] ( /** * Calculates the predicted value when offset is set. */ - def predict(features: Vector, offset: Double): Double = { + private def predict(features: Vector, offset: Double): Double = { val eta = predictLink(features, offset) familyAndLink.fitted(eta) }