Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.DecisionTreeModelReadWrite._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, Strategy => OldStrategy}
import org.apache.spark.mllib.tree.model.{DecisionTreeModel => OldDecisionTreeModel}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -96,8 +97,10 @@ class DecisionTreeClassifier @Since("1.4.0") (
@Since("1.6.0")
override def setSeed(value: Long): this.type = set(seed, value)

override protected def train(dataset: Dataset[_]): DecisionTreeClassificationModel = {
val instr = Instrumentation.create(this, dataset)
override protected def train(
dataset: Dataset[_]): DecisionTreeClassificationModel = instrumented { instr =>
instr.logPipelineStage(this)
instr.logDataset(dataset)
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val numClasses: Int = getNumClasses(dataset)
Expand All @@ -112,30 +115,27 @@ class DecisionTreeClassifier @Since("1.4.0") (
val oldDataset: RDD[LabeledPoint] = extractLabeledPoints(dataset, numClasses)
val strategy = getOldStrategy(categoricalFeatures, numClasses)

instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
cacheNodeIds, checkpointInterval, impurity, seed)

val trees = RandomForest.run(oldDataset, strategy, numTrees = 1, featureSubsetStrategy = "all",
seed = $(seed), instr = Some(instr), parentUID = Some(uid))

val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]
instr.logSuccess(m)
m
trees.head.asInstanceOf[DecisionTreeClassificationModel]
}

/** (private[ml]) Train a decision tree on an RDD */
private[ml] def train(data: RDD[LabeledPoint],
oldStrategy: OldStrategy): DecisionTreeClassificationModel = {
val instr = Instrumentation.create(this, data)
instr.logParams(maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
oldStrategy: OldStrategy): DecisionTreeClassificationModel = instrumented { instr =>
instr.logPipelineStage(this)
instr.logDataset(data)
instr.logParams(this, maxDepth, maxBins, minInstancesPerNode, minInfoGain, maxMemoryInMB,
cacheNodeIds, checkpointInterval, impurity, seed)

val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy = "all",
seed = 0L, instr = Some(instr), parentUID = Some(uid))

val m = trees.head.asInstanceOf[DecisionTreeClassificationModel]
instr.logSuccess(m)
m
trees.head.asInstanceOf[DecisionTreeClassificationModel]
}

/** (private[ml]) Create a Strategy instance to use with the old API. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.GradientBoostedTrees
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._

Expand Down Expand Up @@ -152,7 +152,8 @@ class GBTClassifier @Since("1.4.0") (
set(validationIndicatorCol, value)
}

override protected def train(dataset: Dataset[_]): GBTClassificationModel = {
override protected def train(
dataset: Dataset[_]): GBTClassificationModel = instrumented { instr =>
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))

Expand Down Expand Up @@ -189,8 +190,9 @@ class GBTClassifier @Since("1.4.0") (
s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
}

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, featuresCol, predictionCol, impurity, lossType,
maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode,
seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy,
validationIndicatorCol)
Expand All @@ -204,9 +206,7 @@ class GBTClassifier @Since("1.4.0") (
GradientBoostedTrees.run(trainDataset, boostingStrategy, $(seed), $(featureSubsetStrategy))
}

val m = new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
instr.logSuccess(m)
m
new GBTClassificationModel(uid, baseLearners, learnerWeights, numFeatures)
}

@Since("1.4.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -162,16 +163,17 @@ class LinearSVC @Since("2.2.0") (
@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)

override protected def train(dataset: Dataset[_]): LinearSVCModel = {
override protected def train(dataset: Dataset[_]): LinearSVCModel = instrumented { instr =>
val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
val instances: RDD[Instance] =
dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}

val instr = Instrumentation.create(this, dataset)
instr.logParams(regParam, maxIter, fitIntercept, tol, standardization, threshold,
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, regParam, maxIter, fitIntercept, tol, standardization, threshold,
aggregationDepth)

val (summarizer, labelSummarizer) = {
Expand Down Expand Up @@ -276,9 +278,7 @@ class LinearSVC @Since("2.2.0") (
(Vectors.dense(coefficientArray), intercept, scaledObjectiveHistory.result())
}

val model = copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
instr.logSuccess(model)
model
copyValues(new LinearSVCModel(uid, coefficientVector, interceptVector))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class LogisticRegression @Since("1.2.0") (

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(regParam, elasticNetParam, standardization, threshold,
instr.logParams(this, regParam, elasticNetParam, standardization, threshold,
maxIter, tol, fitIntercept)

val (summarizer, labelSummarizer) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.Dataset

/** Params for Multilayer Perceptron. */
Expand Down Expand Up @@ -230,9 +231,11 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
* @param dataset Training dataset
* @return Fitted model
*/
override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, layers, maxIter, tol,
override protected def train(
dataset: Dataset[_]): MultilayerPerceptronClassificationModel = instrumented { instr =>
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, featuresCol, predictionCol, layers, maxIter, tol,
blockSize, solver, stepSize, seed)

val myLayers = $(layers)
Expand Down Expand Up @@ -264,10 +267,7 @@ class MultilayerPerceptronClassifier @Since("1.5.0") (
}
trainer.setStackSize($(blockSize))
val mlpModel = trainer.train(data)
val model = new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)

instr.logSuccess(model)
model
new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.{DoubleParam, Param, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.HasWeightCol
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.functions.{col, lit}
Expand Down Expand Up @@ -125,8 +126,9 @@ class NaiveBayes @Since("1.5.0") (
*/
private[spark] def trainWithLabelCheck(
dataset: Dataset[_],
positiveLabel: Boolean): NaiveBayesModel = {
val instr = Instrumentation.create(this, dataset)
positiveLabel: Boolean): NaiveBayesModel = instrumented { instr =>
instr.logPipelineStage(this)
instr.logDataset(dataset)
if (positiveLabel && isDefined(thresholds)) {
val numClasses = getNumClasses(dataset)
instr.logNumClasses(numClasses)
Expand All @@ -148,7 +150,7 @@ class NaiveBayes @Since("1.5.0") (
}
}

instr.logParams(labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, rawPredictionCol,
probabilityCol, modelType, smoothing, thresholds)

val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
Expand Down Expand Up @@ -204,9 +206,7 @@ class NaiveBayes @Since("1.5.0") (

val pi = Vectors.dense(piArray)
val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
val model = new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
instr.logSuccess(model)
model
new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
}

@Since("1.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -362,11 +363,12 @@ final class OneVsRest @Since("1.4.0") (
}

@Since("2.0.0")
override def fit(dataset: Dataset[_]): OneVsRestModel = {
override def fit(dataset: Dataset[_]): OneVsRestModel = instrumented { instr =>
transformSchema(dataset.schema)

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol)
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, featuresCol, predictionCol, parallelism, rawPredictionCol)
instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)

// determine number of classes either from metadata if provided, or via computation.
Expand Down Expand Up @@ -440,7 +442,6 @@ final class OneVsRest @Since("1.4.0") (
case attr: Attribute => attr
}
val model = new OneVsRestModel(uid, labelAttribute.toMetadata(), models).setParent(this)
instr.logSuccess(model)
copyValues(model)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.ml.tree._
import org.apache.spark.ml.tree.impl.RandomForest
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.DefaultParamsReader.Metadata
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -115,8 +116,10 @@ class RandomForestClassifier @Since("1.4.0") (
override def setFeatureSubsetStrategy(value: String): this.type =
set(featureSubsetStrategy, value)

override protected def train(dataset: Dataset[_]): RandomForestClassificationModel = {
val instr = Instrumentation.create(this, dataset)
override protected def train(
dataset: Dataset[_]): RandomForestClassificationModel = instrumented { instr =>
instr.logPipelineStage(this)
instr.logDataset(dataset)
val categoricalFeatures: Map[Int, Int] =
MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol)))
val numClasses: Int = getNumClasses(dataset)
Expand All @@ -131,7 +134,7 @@ class RandomForestClassifier @Since("1.4.0") (
val strategy =
super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity)

instr.logParams(labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
instr.logParams(this, labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol,
impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain,
minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, checkpointInterval)

Expand All @@ -140,11 +143,9 @@ class RandomForestClassifier @Since("1.4.0") (
.map(_.asInstanceOf[DecisionTreeClassificationModel])

val numFeatures = oldDataset.first().features.size
val m = new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)
instr.logNumClasses(numClasses)
instr.logNumFeatures(numFeatures)
instr.logSuccess(m)
m
new RandomForestClassificationModel(uid, trees, numFeatures, numClasses)
}

@Since("1.4.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans,
BisectingKMeansModel => MLlibBisectingKMeansModel}
import org.apache.spark.mllib.linalg.VectorImplicits._
Expand Down Expand Up @@ -257,12 +258,13 @@ class BisectingKMeans @Since("2.0.0") (
def setDistanceMeasure(value: String): this.type = set(distanceMeasure, value)

@Since("2.0.0")
override def fit(dataset: Dataset[_]): BisectingKMeansModel = {
override def fit(dataset: Dataset[_]): BisectingKMeansModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)
val rdd = DatasetUtils.columnToOldVector(dataset, getFeaturesCol)

val instr = Instrumentation.create(this, dataset)
instr.logParams(featuresCol, predictionCol, k, maxIter, seed,
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, featuresCol, predictionCol, k, maxIter, seed,
minDivisibleClusterSize, distanceMeasure)

val bkm = new MLlibBisectingKMeans()
Expand All @@ -275,10 +277,8 @@ class BisectingKMeans @Since("2.0.0") (
val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))
val summary = new BisectingKMeansSummary(
model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter))
model.setSummary(Some(summary))
instr.logNamedValue("clusterSizes", summary.clusterSizes)
instr.logSuccess(model)
model
model.setSummary(Some(summary))
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.stat.distribution.MultivariateGaussian
import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.linalg.{Matrices => OldMatrices, Matrix => OldMatrix,
Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -335,7 +336,7 @@ class GaussianMixture @Since("2.0.0") (
private val numSamples = 5

@Since("2.0.0")
override def fit(dataset: Dataset[_]): GaussianMixtureModel = {
override def fit(dataset: Dataset[_]): GaussianMixtureModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)

val sc = dataset.sparkSession.sparkContext
Expand All @@ -352,8 +353,9 @@ class GaussianMixture @Since("2.0.0") (
s"than ${GaussianMixture.MAX_NUM_FEATURES} features because the size of the covariance" +
s" matrix is quadratic in the number of features.")

val instr = Instrumentation.create(this, dataset)
instr.logParams(featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, featuresCol, predictionCol, probabilityCol, k, maxIter, seed, tol)
instr.logNumFeatures(numFeatures)

val shouldDistributeGaussians = GaussianMixture.shouldDistributeGaussians(
Expand Down Expand Up @@ -425,11 +427,9 @@ class GaussianMixture @Since("2.0.0") (
val model = copyValues(new GaussianMixtureModel(uid, weights, gaussianDists)).setParent(this)
val summary = new GaussianMixtureSummary(model.transform(dataset),
$(predictionCol), $(probabilityCol), $(featuresCol), $(k), logLikelihood, iter)
model.setSummary(Some(summary))
instr.logNamedValue("logLikelihood", logLikelihood)
instr.logNamedValue("clusterSizes", summary.clusterSizes)
instr.logSuccess(model)
model
model.setSummary(Some(summary))
}

@Since("2.0.0")
Expand Down
Loading