Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ test_that("spark.glm and predict", {
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

# binomial family
binomialTraining <- training[training$Species %in% c("versicolor", "virginica"), ]
model <- spark.glm(binomialTraining, Species ~ Sepal_Length + Sepal_Width,
family = binomial(link = "logit"))
prediction <- predict(model, binomialTraining)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("virginica", "virginica", "virginica", "versicolor", "virginica",
"versicolor", "virginica", "versicolor", "virginica", "versicolor")
expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)

# poisson family
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
family = poisson(link = identity))
Expand Down Expand Up @@ -128,10 +138,10 @@ test_that("spark.glm summary", {
expect_equal(stats$aic, rStats$aic)

# Test spark.glm works with weighted dataset
a1 <- c(0, 1, 2, 3)
a2 <- c(5, 2, 1, 3)
w <- c(1, 2, 3, 4)
b <- c(1, 0, 1, 0)
a1 <- c(0, 1, 2, 3, 4)
a2 <- c(5, 2, 1, 3, 2)
w <- c(1, 2, 3, 4, 5)
b <- c(1, 0, 1, 0, 0)
data <- as.data.frame(cbind(a1, a2, w, b))
df <- createDataFrame(data)

Expand All @@ -158,7 +168,7 @@ test_that("spark.glm summary", {
data <- as.data.frame(cbind(a1, a2, b))
df <- suppressWarnings(createDataFrame(data))
regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result
expect_equal(regStats$aic, 14.00976, tolerance = 1e-4) # 14.00976 is from summary() result
})

test_that("spark.glm save/load", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.feature.RFormula
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NominalAttribute}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.regression._
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

private[r] class GeneralizedLinearRegressionWrapper private (
val pipeline: PipelineModel,
Expand All @@ -42,6 +47,8 @@ private[r] class GeneralizedLinearRegressionWrapper private (
val rNumIterations: Int,
val isLoaded: Boolean = false) extends MLWritable {

import GeneralizedLinearRegressionWrapper._

private val glm: GeneralizedLinearRegressionModel =
pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel]

Expand All @@ -52,7 +59,15 @@ private[r] class GeneralizedLinearRegressionWrapper private (
def residuals(residualsType: String): DataFrame = glm.summary.residuals(residualsType)

def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset).drop(glm.getFeaturesCol)
if (rFamily == "binomial") {
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_PROB_COL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to make R models incompatible with other languages when you persist them. (They already are because of having hidden Pipelines, but that is fixable.) This, however, will encode special behavior which is only respected when the model is loaded from R, not from other languages.

One option is to encode this in a SQLTransformer.

I'm also worried that these hard-coded columns names will lead to future bug reports about conflicting input column names.

It looks like this same issue appears in other PRs for R, such as [https://issues.apache.org/jira/browse/SPARK-18401]. Given the pervasiveness and that we're in QA right now, I'd recommend we not worry about it for 2.1 and delay fixing it until 2.2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I totally agree the hard-coded column names issues should be fixed, and already have some ideas in my mind to improve SparkR ML wrappers(which include this). This can be placed in the plan of next release version and I will write simple design documents for reviewing.
For the ProbabilityToPrediction issue, the idea of SQLTransformer sounds good and I will try to fix it in follow-up PR. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds great--thanks!

.drop(PREDICTED_LABEL_INDEX_COL)
.drop(glm.getFeaturesCol)
} else {
pipeline.transform(dataset)
.drop(glm.getFeaturesCol)
}
}

override def write: MLWriter =
Expand All @@ -62,6 +77,10 @@ private[r] class GeneralizedLinearRegressionWrapper private (
private[r] object GeneralizedLinearRegressionWrapper
extends MLReadable[GeneralizedLinearRegressionWrapper] {

val PREDICTED_LABEL_PROB_COL = "pred_label_prob"
val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
val PREDICTED_LABEL_COL = "prediction"

def fit(
formula: String,
data: DataFrame,
Expand All @@ -71,8 +90,8 @@ private[r] object GeneralizedLinearRegressionWrapper
maxIter: Int,
weightCol: String,
regParam: Double): GeneralizedLinearRegressionWrapper = {
val rFormula = new RFormula()
.setFormula(formula)
val rFormula = new RFormula().setFormula(formula)
if (family == "binomial") rFormula.setForceIndexLabel(true)
RWrapperUtils.checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)
// get labels and feature names from output schema
Expand All @@ -90,9 +109,27 @@ private[r] object GeneralizedLinearRegressionWrapper
.setWeightCol(weightCol)
.setRegParam(regParam)
.setFeaturesCol(rFormula.getFeaturesCol)
val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, glr))
.fit(data)
val pipeline = if (family == "binomial") {
// Convert prediction from probability to label index.
val probToPred = new ProbabilityToPrediction()
.setInputCol(PREDICTED_LABEL_PROB_COL)
.setOutputCol(PREDICTED_LABEL_INDEX_COL)
// Convert prediction from label index to original label.
val labelAttr = Attribute.fromStructField(schema(rFormulaModel.getLabelCol))
.asInstanceOf[NominalAttribute]
val labels = labelAttr.values.get
val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
.setOutputCol(PREDICTED_LABEL_COL)
.setLabels(labels)

new Pipeline()
.setStages(Array(rFormulaModel, glr.setPredictionCol(PREDICTED_LABEL_PROB_COL),
probToPred, idxToStr))
.fit(data)
} else {
new Pipeline().setStages(Array(rFormulaModel, glr)).fit(data)
}

val glm: GeneralizedLinearRegressionModel =
pipeline.stages(1).asInstanceOf[GeneralizedLinearRegressionModel]
Expand Down Expand Up @@ -200,3 +237,27 @@ private[r] object GeneralizedLinearRegressionWrapper
}
}
}

/**
* This utility transformer converts the predicted value of GeneralizedLinearRegressionModel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this could be reusable and should go to RWrapperUtils.scala?

Copy link
Contributor Author

@yanboliang yanboliang Nov 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an inherent feature of other classification algorithms who extends Classifier currently, only GLM use the separate conversion since GeneralizedLinearRegression extends Regressor. So it should not be reused, I'm open to move it to RWrapperUtils.scala when it's necessary. Thanks.

* with "binomial" family from probability to prediction according to threshold 0.5.
*/
private[r] class ProbabilityToPrediction private[r] (override val uid: String)
extends Transformer with HasInputCol with HasOutputCol with DefaultParamsWritable {

def this() = this(Identifiable.randomUID("probToPred"))

def setInputCol(value: String): this.type = set(inputCol, value)

def setOutputCol(value: String): this.type = set(outputCol, value)

override def transformSchema(schema: StructType): StructType = {
StructType(schema.fields :+ StructField($(outputCol), DoubleType))
}

override def transform(dataset: Dataset[_]): DataFrame = {
dataset.withColumn($(outputCol), round(col($(inputCol))))
}

override def copy(extra: ParamMap): ProbabilityToPrediction = defaultCopy(extra)
}