diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 0cd0d75df0f7..e26f9a7a2ab6 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -11,7 +11,8 @@ Depends: R (>= 3.0), methods, Suggests: - testthat + testthat, + e1071 Description: R frontend for Spark License: Apache License (== 2.0) Collate: diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 636d39e1e9ca..5d8a4b1d6ed8 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -15,7 +15,8 @@ exportMethods("glm", "predict", "summary", "kmeans", - "fitted") + "fitted", + "naiveBayes") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 6ad71fcb4671..46b115f45e53 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1175,3 +1175,7 @@ setGeneric("kmeans") #' @rdname fitted #' @export setGeneric("fitted") + +#' @rdname naiveBayes +#' @export +setGeneric("naiveBayes", function(formula, data, ...) { standardGeneric("naiveBayes") }) diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 5c0d3dcf3af9..87c97f90c720 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -42,7 +42,7 @@ setClass("PipelineModel", representation(model = "jobj")) #' @rdname glm #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' sc <- sparkR.init() #' sqlContext <- sparkRSQL.init(sc) #' data(iris) @@ -71,7 +71,7 @@ setMethod("glm", signature(formula = "formula", family = "ANY", data = "DataFram #' @rdname predict #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- glm(y ~ x, trainingData) #' predicted <- predict(model, testData) #' showDF(predicted) @@ -97,7 +97,7 @@ setMethod("predict", signature(object = "PipelineModel"), #' @rdname summary #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- glm(y ~ x, trainingData) #' summary(model) #'} @@ -135,6 +135,19 @@ setMethod("summary", signature(object = "PipelineModel"), colnames(coefficients) <- unlist(features) rownames(coefficients) <- 1:k return(list(coefficients = coefficients, size = size, cluster = dataFrame(cluster))) + } else if (modelName == "NaiveBayesModel") { + labels <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesLabels", object@model) + pi <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesPi", object@model) + theta <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", + "getNaiveBayesTheta", object@model) + pi <- t(as.matrix(unlist(pi))) + colnames(pi) <- unlist(labels) + theta <- matrix(theta, nrow = length(labels)) + rownames(theta) <- unlist(labels) + colnames(theta) <- unlist(features) + return(list(pi = pi, theta = theta)) } else { stop(paste("Unsupported model", modelName, sep = " ")) } @@ -152,7 +165,7 @@ setMethod("summary", signature(object = "PipelineModel"), #' @rdname kmeans #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- kmeans(x, centers = 2, algorithm="random") #'} setMethod("kmeans", signature(x = "DataFrame"), @@ -173,7 +186,7 @@ setMethod("kmeans", signature(x = "DataFrame"), #' @rdname fitted #' @export #' @examples -#'\dontrun{ +#' \dontrun{ #' model <- kmeans(trainingData, 2) #' fitted.model <- fitted(model) #' showDF(fitted.model) @@ -192,3 +205,35 @@ setMethod("fitted", signature(object = "PipelineModel"), stop(paste("Unsupported model", modelName, sep = " ")) } }) + +#' Fit a naive Bayes model +#' +#' Fit a naive Bayes model, similarly to R's naiveBayes() except for omitting two arguments 'subset' +#' and 'na.action'. Users can use 'subset' function and 'fillna' or 'na.omit' function of DataFrame, +#' respectively, to preprocess their DataFrame. We use na.omit in this interface to remove rows with +#' NA values. +#' +#' @param object A symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. +#' @param data DataFrame for training +#' @param lambda Smoothing parameter +#' @param modelType Either 'multinomial' or 'bernoulli'. Default "multinomial". +#' @return A fitted naive Bayes model. +#' @rdname naiveBayes +#' @export +#' @examples +#' \dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' df <- createDataFrame(sqlContext, infert) +#' model <- naiveBayes(education ~ ., df, lambda = 1, modelType = "multinomial") +#'} +setMethod("naiveBayes", signature(formula = "formula", data = "DataFrame"), + function(formula, data, lambda = 1, modelType = c("multinomial", "bernoulli"), ...) { + data <- na.omit(data) + formula <- paste(deparse(formula), collapse = "") + modelType <- match.arg(modelType) + model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitNaiveBayes", + formula, data@sdf, lambda, modelType) + return(new("PipelineModel", model = model)) + }) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index e120462964d1..b62dcd873ce5 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -141,3 +141,27 @@ test_that("kmeans", { cluster <- summary.model$cluster expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) }) + +test_that("naiveBayes", { + training <- suppressWarnings(createDataFrame(sqlContext, infert)) + + model <- naiveBayes(education ~ ., data = training, lambda = 1, modelType = "multinomial") + sample <- take(select(predict(model, training), "rawLabelsPrediction"), 1) + expect_equal(typeof(sample$rawLabelsPrediction), "character") + expect_equal(sample$rawLabelsPrediction, "0-5yrs") + + # Test summary works on naiveBayes + summary.model <- summary(model) + expect_equal(length(summary.model$pi), 3) + expect_equal(sum(summary.model$pi), 1) + l1 <- summary.model$theta[1, ] + l2 <- summary.model$theta[2, ] + expect_equal(sum(unlist(l1)), 1) + expect_equal(sum(unlist(l2)), 1) + + # Test e1071::naiveBayes + if (requireNamespace("e1071", quietly = TRUE)) { + expect_that(m <- e1071::naiveBayes(education ~ ., data = infert), not(throws_error())) + expect_equal(as.character(predict(m, infert[1, ])), "0-5yrs") + } +}) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index 483ef0d88ca6..c4b4eebb65b1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.PredictorParams +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.util._ import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes} @@ -104,7 +105,12 @@ class NaiveBayes @Since("1.5.0") ( override protected def train(dataset: DataFrame): NaiveBayesModel = { val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset) val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) - NaiveBayesModel.fromOld(oldModel, this) + val nbModel = copyValues(NaiveBayesModel.fromOld(oldModel, this)) + val attr = AttributeGroup.fromStructField(dataset.schema($(featuresCol))).attributes + if (attr.isDefined) { + nbModel.setFeatureNames(attr.get.map(_.name.getOrElse("NA"))) + } + nbModel } @Since("1.5.0") @@ -227,6 +233,21 @@ class NaiveBayesModel private[ml] ( @Since("1.6.0") override def write: MLWriter = new NaiveBayesModel.NaiveBayesModelWriter(this) + + private var featureNames: Option[Array[String]] = None + + private[classification] def setFeatureNames(names: Array[String]): this.type = { + this.featureNames = Some(names) + this + } + + private[ml] def getFeatureNames: Array[String] = featureNames match { + case Some(names) => names + case None => + throw new SparkException( + s"No training result available for the ${this.getClass.getSimpleName}", + new NullPointerException()) + } } @Since("1.6.0") @@ -237,7 +258,6 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { oldModel: OldNaiveBayesModel, parent: NaiveBayes): NaiveBayesModel = { val uid = if (parent != null) parent.uid else Identifiable.randomUID("nb") - val labels = Vectors.dense(oldModel.labels) val pi = Vectors.dense(oldModel.pi) val theta = new DenseMatrix(oldModel.labels.length, oldModel.theta(0).length, oldModel.theta.flatten, true) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index e7ca7ada74c8..ee68a1d12757 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -257,6 +257,18 @@ class RFormulaModel private[feature]( "Label column already exists and is not of type DoubleType.") } + /** + * Get the original array of labels if exists. + */ + private[ml] def getOriginalLabels: Option[Array[String]] = { + // According to the sequences of transformers in RFormula, if the last stage is a + // StringIndexerModel, then we can extract the original labels from it. + pipelineModel.stages.last match { + case m: StringIndexerModel => Some(m.labels) + case _ => None + } + } + @Since("2.0.0") override def write: MLWriter = new RFormulaModel.RFormulaModelWriter(this) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala index d23e4fc9d1f5..bbe7c0739575 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/SparkRWrappers.scala @@ -17,11 +17,11 @@ package org.apache.spark.ml.api.r -import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel} +import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, NaiveBayes, NaiveBayesModel} import org.apache.spark.ml.clustering.{KMeans, KMeansModel} -import org.apache.spark.ml.feature.{RFormula, VectorAssembler} +import org.apache.spark.ml.feature._ import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} import org.apache.spark.sql.DataFrame @@ -52,6 +52,45 @@ private[r] object SparkRWrappers { pipeline.fit(df) } + def fitNaiveBayes( + value: String, + df: DataFrame, + lambda: Double, + modelType: String): PipelineModel = { + + // Transform data with RFormula + val formula = new RFormula().setFormula(value) + val fModel = formula.fit(df) + val rawLabels = fModel.getOriginalLabels + + val naiveBayes = new NaiveBayes().setSmoothing(lambda).setModelType(modelType) + val rawLabelsIndexer = new IndexToString() + .setInputCol(naiveBayes.getLabelCol).setOutputCol("rawLabelsPrediction") + + if (fModel.getOriginalLabels.isDefined) { + // String labels have already been re-indexed by RFormula. + val stages: Array[PipelineStage] = + Array(fModel, naiveBayes, rawLabelsIndexer.setLabels(rawLabels.get)) + new Pipeline().setStages(stages).fit(df) + } else { + // Re-index numerical labels for NaiveBayes since it assumes labels are indices. + val labelIndexer = new StringIndexer().setInputCol(fModel.getLabelCol).fit(df) + val stages: Array[PipelineStage] = + Array( + labelIndexer, + fModel, + naiveBayes.setLabelCol(labelIndexer.getOutputCol), + rawLabelsIndexer.setLabels(labelIndexer.labels)) + new Pipeline().setStages(stages).fit(df) + } + } + + def isNaiveBayesModel(model: PipelineModel): Boolean = { + model.stages.length >= 2 && + model.stages(model.stages.length - 2).isInstanceOf[NaiveBayesModel] && + model.stages.last.isInstanceOf[IndexToString] + } + def fitKMeans( df: DataFrame, initMode: String, @@ -70,7 +109,7 @@ private[r] object SparkRWrappers { def getModelCoefficients(model: PipelineModel): Array[Double] = { model.stages.last match { - case m: LinearRegressionModel => { + case m: LinearRegressionModel => val coefficientStandardErrorsR = Array(m.summary.coefficientStandardErrors.last) ++ m.summary.coefficientStandardErrors.dropRight(1) val tValuesR = Array(m.summary.tValues.last) ++ m.summary.tValues.dropRight(1) @@ -81,16 +120,15 @@ private[r] object SparkRWrappers { } else { m.coefficients.toArray ++ coefficientStandardErrorsR ++ tValuesR ++ pValuesR } - } - case m: LogisticRegressionModel => { + case m: LogisticRegressionModel => if (m.getFitIntercept) { Array(m.intercept) ++ m.coefficients.toArray } else { m.coefficients.toArray } - } case m: KMeansModel => m.clusterCenters.flatMap(_.toArray) + case _ if isNaiveBayesModel(model) => Array() // A dummy result to prevent unmatched error. } } @@ -129,7 +167,28 @@ private[r] object SparkRWrappers { } } - def getModelFeatures(model: PipelineModel): Array[String] = { + /** + * Extract labels' names for NaiveBayesModel. + */ + def getNaiveBayesLabels(model: PipelineModel): Array[String] = { + assert(isNaiveBayesModel(model), + s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") + model.stages.last.asInstanceOf[IndexToString].getLabels + } + + def getNaiveBayesPi(model: PipelineModel): Array[Double] = { + assert(isNaiveBayesModel(model), + s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") + model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].pi.toArray.map(math.exp) + } + + def getNaiveBayesTheta(model: PipelineModel): Array[Double] = { + assert(isNaiveBayesModel(model), + s"NaiveBayesModel required but ${model.stages.last.getClass.getSimpleName} found.") + model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].theta.toArray.map(math.exp) + } + + def getModelFeatures(model: PipelineModel): Array[String] = { model.stages.last match { case m: LinearRegressionModel => val attrs = AttributeGroup.fromStructField( @@ -151,17 +210,17 @@ private[r] object SparkRWrappers { val attrs = AttributeGroup.fromStructField( m.summary.predictions.schema(m.summary.featuresCol)) attrs.attributes.get.map(_.name.get) + case _ if isNaiveBayesModel(model) => + model.stages(model.stages.length - 2).asInstanceOf[NaiveBayesModel].getFeatureNames } } def getModelName(model: PipelineModel): String = { model.stages.last match { - case m: LinearRegressionModel => - "LinearRegressionModel" - case m: LogisticRegressionModel => - "LogisticRegressionModel" - case m: KMeansModel => - "KMeansModel" + case m: LinearRegressionModel => "LinearRegressionModel" + case m: LogisticRegressionModel => "LogisticRegressionModel" + case m: KMeansModel => "KMeansModel" + case _ if isNaiveBayesModel(model) => "NaiveBayesModel" } } }