Skip to content

Commit 95eb06b

Browse files
committed
[SPARK-18438][SPARKR][ML] spark.mlp should support RFormula.
## What changes were proposed in this pull request? ```spark.mlp``` should support ```RFormula``` like other ML algorithm wrappers. BTW, I did some cleanup and improvement for ```spark.mlp```. ## How was this patch tested? Unit tests. Author: Yanbo Liang <[email protected]> Closes #15883 from yanboliang/spark-18438.
1 parent 4ac9759 commit 95eb06b

File tree

4 files changed

+96
-60
lines changed

4 files changed

+96
-60
lines changed

R/pkg/R/generics.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,7 @@ setGeneric("spark.logit", function(data, formula, ...) { standardGeneric("spark.
13731373

13741374
#' @rdname spark.mlp
13751375
#' @export
1376-
setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") })
1376+
setGeneric("spark.mlp", function(data, formula, ...) { standardGeneric("spark.mlp") })
13771377

13781378
#' @rdname spark.naiveBayes
13791379
#' @export

R/pkg/R/mllib.R

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ setMethod("write.ml", signature(object = "LDAModel", path = "character"),
525525
#' @note spark.isoreg since 2.1.0
526526
setMethod("spark.isoreg", signature(data = "SparkDataFrame", formula = "formula"),
527527
function(data, formula, isotonic = TRUE, featureIndex = 0, weightCol = NULL) {
528-
formula <- paste0(deparse(formula), collapse = "")
528+
formula <- paste(deparse(formula), collapse = "")
529529

530530
if (is.null(weightCol)) {
531531
weightCol <- ""
@@ -775,7 +775,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
775775
tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE,
776776
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
777777
probabilityCol = "probability") {
778-
formula <- paste0(deparse(formula), collapse = "")
778+
formula <- paste(deparse(formula), collapse = "")
779779

780780
if (is.null(weightCol)) {
781781
weightCol <- ""
@@ -858,6 +858,8 @@ setMethod("summary", signature(object = "LogisticRegressionModel"),
858858
#' Multilayer Perceptron}
859859
#'
860860
#' @param data a \code{SparkDataFrame} of observations and labels for model fitting.
861+
#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
862+
#' operators are supported, including '~', '.', ':', '+', and '-'.
861863
#' @param blockSize blockSize parameter.
862864
#' @param layers integer vector containing the number of nodes for each layer
863865
#' @param solver solver parameter, supported options: "gd" (minibatch gradient descent) or "l-bfgs".
@@ -870,7 +872,7 @@ setMethod("summary", signature(object = "LogisticRegressionModel"),
870872
#' @param ... additional arguments passed to the method.
871873
#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model.
872874
#' @rdname spark.mlp
873-
#' @aliases spark.mlp,SparkDataFrame-method
875+
#' @aliases spark.mlp,SparkDataFrame,formula-method
874876
#' @name spark.mlp
875877
#' @seealso \link{read.ml}
876878
#' @export
@@ -879,7 +881,7 @@ setMethod("summary", signature(object = "LogisticRegressionModel"),
879881
#' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
880882
#'
881883
#' # fit a Multilayer Perceptron Classification Model
882-
#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 3), solver = "l-bfgs",
884+
#' model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 3), solver = "l-bfgs",
883885
#' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1,
884886
#' initialWeights = c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
885887
#'
@@ -896,9 +898,10 @@ setMethod("summary", signature(object = "LogisticRegressionModel"),
896898
#' summary(savedModel)
897899
#' }
898900
#' @note spark.mlp since 2.1.0
899-
setMethod("spark.mlp", signature(data = "SparkDataFrame"),
900-
function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
901+
setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"),
902+
function(data, formula, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
901903
tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL) {
904+
formula <- paste(deparse(formula), collapse = "")
902905
if (is.null(layers)) {
903906
stop ("layers must be a integer vector with length > 1.")
904907
}
@@ -913,7 +916,7 @@ setMethod("spark.mlp", signature(data = "SparkDataFrame"),
913916
initialWeights <- as.array(as.numeric(na.omit(initialWeights)))
914917
}
915918
jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
916-
"fit", data@sdf, as.integer(blockSize), as.array(layers),
919+
"fit", data@sdf, formula, as.integer(blockSize), as.array(layers),
917920
as.character(solver), as.integer(maxIter), as.numeric(tol),
918921
as.numeric(stepSize), seed, initialWeights)
919922
new("MultilayerPerceptronClassificationModel", jobj = jobj)
@@ -936,20 +939,23 @@ setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel
936939
# Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp}
937940

938941
#' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp}
939-
#' @return \code{summary} returns a list containing \code{labelCount}, \code{layers}, and
940-
#' \code{weights}. For \code{weights}, it is a numeric vector with length equal to
941-
#' the expected given the architecture (i.e., for 8-10-2 network, 100 connection weights).
942+
#' @return \code{summary} returns a list containing \code{numOfInputs}, \code{numOfOutputs},
943+
#' \code{layers}, and \code{weights}. For \code{weights}, it is a numeric vector with
944+
#' length equal to the expected given the architecture (i.e., for 8-10-2 network,
945+
#' 112 connection weights).
942946
#' @rdname spark.mlp
943947
#' @export
944948
#' @aliases summary,MultilayerPerceptronClassificationModel-method
945949
#' @note summary(MultilayerPerceptronClassificationModel) since 2.1.0
946950
setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel"),
947951
function(object) {
948952
jobj <- object@jobj
949-
labelCount <- callJMethod(jobj, "labelCount")
950953
layers <- unlist(callJMethod(jobj, "layers"))
954+
numOfInputs <- head(layers, n = 1)
955+
numOfOutputs <- tail(layers, n = 1)
951956
weights <- callJMethod(jobj, "weights")
952-
list(labelCount = labelCount, layers = layers, weights = weights)
957+
list(numOfInputs = numOfInputs, numOfOutputs = numOfOutputs,
958+
layers = layers, weights = weights)
953959
})
954960

955961
#' Naive Bayes Models

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -371,12 +371,13 @@ test_that("spark.kmeans", {
371371
test_that("spark.mlp", {
372372
df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
373373
source = "libsvm")
374-
model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", maxIter = 100,
375-
tol = 0.5, stepSize = 1, seed = 1)
374+
model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3),
375+
solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
376376

377377
# Test summary method
378378
summary <- summary(model)
379-
expect_equal(summary$labelCount, 3)
379+
expect_equal(summary$numOfInputs, 4)
380+
expect_equal(summary$numOfOutputs, 3)
380381
expect_equal(summary$layers, c(4, 5, 4, 3))
381382
expect_equal(length(summary$weights), 64)
382383
expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825),
@@ -385,7 +386,7 @@ test_that("spark.mlp", {
385386
# Test predict method
386387
mlpTestDF <- df
387388
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
388-
expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1))
389+
expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0"))
389390

390391
# Test model save/load
391392
modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
@@ -395,46 +396,68 @@ test_that("spark.mlp", {
395396
model2 <- read.ml(modelPath)
396397
summary2 <- summary(model2)
397398

398-
expect_equal(summary2$labelCount, 3)
399+
expect_equal(summary2$numOfInputs, 4)
400+
expect_equal(summary2$numOfOutputs, 3)
399401
expect_equal(summary2$layers, c(4, 5, 4, 3))
400402
expect_equal(length(summary2$weights), 64)
401403

402404
unlink(modelPath)
403405

404406
# Test default parameter
405-
model <- spark.mlp(df, layers = c(4, 5, 4, 3))
407+
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
406408
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
407-
expect_equal(head(mlpPredictions$prediction, 10), c(1, 1, 1, 1, 0, 1, 2, 2, 1, 0))
409+
expect_equal(head(mlpPredictions$prediction, 10),
410+
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
408411

409412
# Test illegal parameter
410-
expect_error(spark.mlp(df, layers = NULL), "layers must be a integer vector with length > 1.")
411-
expect_error(spark.mlp(df, layers = c()), "layers must be a integer vector with length > 1.")
412-
expect_error(spark.mlp(df, layers = c(3)), "layers must be a integer vector with length > 1.")
413+
expect_error(spark.mlp(df, label ~ features, layers = NULL),
414+
"layers must be a integer vector with length > 1.")
415+
expect_error(spark.mlp(df, label ~ features, layers = c()),
416+
"layers must be a integer vector with length > 1.")
417+
expect_error(spark.mlp(df, label ~ features, layers = c(3)),
418+
"layers must be a integer vector with length > 1.")
413419

414420
# Test random seed
415421
# default seed
416-
model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10)
422+
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10)
417423
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
418-
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 2, 2, 1, 2, 0, 1))
424+
expect_equal(head(mlpPredictions$prediction, 10),
425+
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
419426
# seed equals 10
420-
model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
427+
model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
421428
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
422-
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 2, 1, 2, 2, 1, 0, 0, 1))
429+
expect_equal(head(mlpPredictions$prediction, 10),
430+
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
423431

424432
# test initialWeights
425-
model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights =
433+
model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
426434
c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
427435
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
428-
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1))
436+
expect_equal(head(mlpPredictions$prediction, 10),
437+
c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
429438

430-
model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights =
439+
model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights =
431440
c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0))
432441
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
433-
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 0, 1))
442+
expect_equal(head(mlpPredictions$prediction, 10),
443+
c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0"))
434444

435-
model <- spark.mlp(df, layers = c(4, 3), maxIter = 2)
445+
model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
436446
mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
437-
expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 2, 1, 0, 0, 1))
447+
expect_equal(head(mlpPredictions$prediction, 10),
448+
c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0"))
449+
450+
# Test formula works well
451+
df <- suppressWarnings(createDataFrame(iris))
452+
model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width,
453+
layers = c(4, 3))
454+
summary <- summary(model)
455+
expect_equal(summary$numOfInputs, 4)
456+
expect_equal(summary$numOfOutputs, 3)
457+
expect_equal(summary$layers, c(4, 3))
458+
expect_equal(length(summary$weights), 15)
459+
expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413,
460+
-10.2376130), tolerance = 1e-6)
438461
})
439462

440463
test_that("spark.naiveBayes", {

mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala

Lines changed: 34 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,29 @@ import org.json4s.jackson.JsonMethods._
2424

2525
import org.apache.spark.ml.{Pipeline, PipelineModel}
2626
import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier}
27+
import org.apache.spark.ml.feature.{IndexToString, RFormula}
2728
import org.apache.spark.ml.linalg.Vectors
29+
import org.apache.spark.ml.r.RWrapperUtils._
2830
import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter}
2931
import org.apache.spark.sql.{DataFrame, Dataset}
3032

3133
private[r] class MultilayerPerceptronClassifierWrapper private (
32-
val pipeline: PipelineModel,
33-
val labelCount: Long,
34-
val layers: Array[Int],
35-
val weights: Array[Double]
34+
val pipeline: PipelineModel
3635
) extends MLWritable {
3736

37+
import MultilayerPerceptronClassifierWrapper._
38+
39+
val mlpModel: MultilayerPerceptronClassificationModel =
40+
pipeline.stages(1).asInstanceOf[MultilayerPerceptronClassificationModel]
41+
42+
val weights: Array[Double] = mlpModel.weights.toArray
43+
val layers: Array[Int] = mlpModel.layers
44+
3845
def transform(dataset: Dataset[_]): DataFrame = {
3946
pipeline.transform(dataset)
47+
.drop(mlpModel.getFeaturesCol)
48+
.drop(mlpModel.getLabelCol)
49+
.drop(PREDICTED_LABEL_INDEX_COL)
4050
}
4151

4252
/**
@@ -49,10 +59,12 @@ private[r] class MultilayerPerceptronClassifierWrapper private (
4959
private[r] object MultilayerPerceptronClassifierWrapper
5060
extends MLReadable[MultilayerPerceptronClassifierWrapper] {
5161

62+
val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
5263
val PREDICTED_LABEL_COL = "prediction"
5364

5465
def fit(
5566
data: DataFrame,
67+
formula: String,
5668
blockSize: Int,
5769
layers: Array[Int],
5870
solver: String,
@@ -62,8 +74,13 @@ private[r] object MultilayerPerceptronClassifierWrapper
6274
seed: String,
6375
initialWeights: Array[Double]
6476
): MultilayerPerceptronClassifierWrapper = {
77+
val rFormula = new RFormula()
78+
.setFormula(formula)
79+
.setForceIndexLabel(true)
80+
checkDataColumns(rFormula, data)
81+
val rFormulaModel = rFormula.fit(data)
6582
// get labels and feature names from output schema
66-
val schema = data.schema
83+
val (_, labels) = getFeaturesAndLabels(rFormulaModel, data)
6784

6885
// assemble and fit the pipeline
6986
val mlp = new MultilayerPerceptronClassifier()
@@ -73,25 +90,25 @@ private[r] object MultilayerPerceptronClassifierWrapper
7390
.setMaxIter(maxIter)
7491
.setTol(tol)
7592
.setStepSize(stepSize)
76-
.setPredictionCol(PREDICTED_LABEL_COL)
93+
.setFeaturesCol(rFormula.getFeaturesCol)
94+
.setLabelCol(rFormula.getLabelCol)
95+
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
7796
if (seed != null && seed.length > 0) mlp.setSeed(seed.toInt)
7897
if (initialWeights != null) {
7998
require(initialWeights.length > 0)
8099
mlp.setInitialWeights(Vectors.dense(initialWeights))
81100
}
82101

102+
val idxToStr = new IndexToString()
103+
.setInputCol(PREDICTED_LABEL_INDEX_COL)
104+
.setOutputCol(PREDICTED_LABEL_COL)
105+
.setLabels(labels)
106+
83107
val pipeline = new Pipeline()
84-
.setStages(Array(mlp))
108+
.setStages(Array(rFormulaModel, mlp, idxToStr))
85109
.fit(data)
86110

87-
val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel =
88-
pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel]
89-
90-
val weights = multilayerPerceptronClassificationModel.weights.toArray
91-
val layersFromPipeline = multilayerPerceptronClassificationModel.layers
92-
val labelCount = data.select("label").distinct().count()
93-
94-
new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights)
111+
new MultilayerPerceptronClassifierWrapper(pipeline)
95112
}
96113

97114
/**
@@ -107,17 +124,10 @@ private[r] object MultilayerPerceptronClassifierWrapper
107124

108125
override def load(path: String): MultilayerPerceptronClassifierWrapper = {
109126
implicit val format = DefaultFormats
110-
val rMetadataPath = new Path(path, "rMetadata").toString
111127
val pipelinePath = new Path(path, "pipeline").toString
112128

113-
val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
114-
val rMetadata = parse(rMetadataStr)
115-
val labelCount = (rMetadata \ "labelCount").extract[Long]
116-
val layers = (rMetadata \ "layers").extract[Array[Int]]
117-
val weights = (rMetadata \ "weights").extract[Array[Double]]
118-
119129
val pipeline = PipelineModel.load(pipelinePath)
120-
new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights)
130+
new MultilayerPerceptronClassifierWrapper(pipeline)
121131
}
122132
}
123133

@@ -128,10 +138,7 @@ private[r] object MultilayerPerceptronClassifierWrapper
128138
val rMetadataPath = new Path(path, "rMetadata").toString
129139
val pipelinePath = new Path(path, "pipeline").toString
130140

131-
val rMetadata = ("class" -> instance.getClass.getName) ~
132-
("labelCount" -> instance.labelCount) ~
133-
("layers" -> instance.layers.toSeq) ~
134-
("weights" -> instance.weights.toArray.toSeq)
141+
val rMetadata = "class" -> instance.getClass.getName
135142
val rMetadataJson: String = compact(render(rMetadata))
136143
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
137144

0 commit comments

Comments
 (0)