Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
5650e98
Changed CrossValidator and TrainValidationSplit fit methods to evalua…
BryanCutler Jan 31, 2017
36a1a68
made closure vars more explicit, moved param default to trait
BryanCutler Feb 14, 2017
b051afa
added paramvalidator for numParallelEval to ensure >=1
BryanCutler Feb 14, 2017
46fe252
added test cases for CrossValidation and TrainValidationSplit
BryanCutler Feb 15, 2017
1274ba4
added numParallelEval param usage to examples
BryanCutler Feb 15, 2017
80ac2fd
added documentation to ml-tuning
BryanCutler Feb 16, 2017
8126710
changed sliding window limit to use a semaphore instead to prevent wa…
BryanCutler Feb 16, 2017
6a9b735
added note about parallelism capped by Scala collection thread pool, …
BryanCutler Feb 16, 2017
1c2e391
reworked to use ExecutorService and Futures
BryanCutler Feb 28, 2017
9e055cd
fixed wildcard import
BryanCutler Feb 28, 2017
97ad7b4
made doc changes
BryanCutler Apr 11, 2017
5e8a086
changed ExecutorService factory to a trait to be compatible with Java
BryanCutler Apr 12, 2017
864c99c
Merge remote-tracking branch 'upstream/master' into parallel-model-ev…
BryanCutler Jun 13, 2017
ad8a870
Changed ExecutorService to be set explicitly instead of factory
BryanCutler Jun 14, 2017
911af1d
added HasParallelism trait
BryanCutler Aug 23, 2017
658aacb
Updated to use Trait HasParallelsim
BryanCutler Aug 23, 2017
2c73b0b
fixed up docs
BryanCutler Aug 23, 2017
7a8221b
removed blas calculation for CrossValidator metric calc, was not nece…
BryanCutler Sep 5, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/ml-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ for multiclass problems. The default metric used to choose the best `ParamMap` c
method in each of these evaluators.

To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility.
By default, sets of parameters from the parameter grid are evaluated in serial. Parameter evaluation can be done in parallel by setting `parallelism` with a value of 2 or more (a value of 1 will be serial) before running model selection with `CrossValidator` or `TrainValidationSplit` (NOTE: this is not yet supported in Python).
The value of `parallelism` should be chosen carefully to maximize parallelism without exceeding cluster resources, and larger values may not always lead to improved performance. Generally speaking, a value up to 10 should be sufficient for most clusters.

# Cross-Validation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ public static void main(String[] args) {
CrossValidator cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid).setNumFolds(2); // Use 3+ in practice
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2) // Use 3+ in practice
.setParallelism(2); // Evaluate up to 2 parameter settings in parallel

// Run cross-validation, and choose the best set of parameters.
CrossValidatorModel cvModel = cv.fit(training);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public static void main(String[] args) {
.setEstimator(lr)
.setEvaluator(new RegressionEvaluator())
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8); // 80% for training and the remaining 20% for validation
.setTrainRatio(0.8) // 80% for training and the remaining 20% for validation
.setParallelism(2); // Evaluate up to 2 parameter settings in parallel

// Run train validation split, and choose the best set of parameters.
TrainValidationSplitModel model = trainValidationSplit.fit(training);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ object ModelSelectionViaCrossValidationExample {
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2) // Use 3+ in practice
.setParallelism(2) // Evaluate up to 2 parameter settings in parallel

// Run cross-validation, and choose the best set of parameters.
val cvModel = cv.fit(training)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ object ModelSelectionViaTrainValidationSplitExample {
.setEstimatorParamMaps(paramGrid)
// 80% of the data will be used for training and the remaining 20% for validation.
.setTrainRatio(0.8)
// Evaluate up to 2 parameter settings in parallel
.setParallelism(2)

// Run train validation split, and choose the best set of parameters.
val model = trainValidationSplit.fit(training)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.param.shared

import scala.concurrent.ExecutionContext

import org.apache.spark.ml.param.{IntParam, Params, ParamValidators}
import org.apache.spark.util.ThreadUtils

/**
* Trait to define a level of parallelism for algorithms that are able to use
* multithreaded execution, and provide a thread-pool based execution context.
*/
private[ml] trait HasParallelism extends Params {

/**
* The number of threads to use when running parallel algorithms.
* Default is 1 for serial execution
*
* @group expertParam
*/
val parallelism = new IntParam(this, "parallelism",
"the number of threads to use when running parallel algorithms", ParamValidators.gtEq(1))

setDefault(parallelism -> 1)

/** @group expertGetParam */
def getParallelism: Int = $(parallelism)

/**
* Create a new execution context with a thread-pool that has a maximum number of threads
* set to the value of [[parallelism]]. If this param is set to 1, a same-thread executor
* will be used to run in serial.
*/
private[ml] def getExecutionContext: ExecutionContext = {
getParallelism match {
case 1 =>
ThreadUtils.sameThread
case n =>
ExecutionContext.fromExecutorService(ThreadUtils
.newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@ package org.apache.spark.ml.tuning
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.Duration

import com.github.fommil.netlib.F2jBLAS
import org.apache.hadoop.fs.Path
import org.json4s.DefaultFormats

import org.apache.spark.annotation.Since
import org.apache.spark.internal.Logging
import org.apache.spark.ml._
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.{IntParam, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.HasParallelism
import org.apache.spark.ml.util._
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils

/**
* Params for [[CrossValidator]] and [[CrossValidatorModel]].
Expand Down Expand Up @@ -64,13 +67,11 @@ private[ml] trait CrossValidatorParams extends ValidatorParams {
@Since("1.2.0")
class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
extends Estimator[CrossValidatorModel]
with CrossValidatorParams with MLWritable with Logging {
with CrossValidatorParams with HasParallelism with MLWritable with Logging {

@Since("1.2.0")
def this() = this(Identifiable.randomUID("cv"))

private val f2jBLAS = new F2jBLAS

/** @group setParam */
@Since("1.2.0")
def setEstimator(value: Estimator[_]): this.type = set(estimator, value)
Expand All @@ -91,6 +92,15 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
@Since("2.0.0")
def setSeed(value: Long): this.type = set(seed, value)

/**
* Set the mamixum level of parallelism to evaluate models in parallel.
* Default is 1 for serial evaluation
*
* @group expertSetParam
*/
@Since("2.3.0")
def setParallelism(value: Int): this.type = set(parallelism, value)

@Since("2.0.0")
override def fit(dataset: Dataset[_]): CrossValidatorModel = {
val schema = dataset.schema
Expand All @@ -99,32 +109,49 @@ class CrossValidator @Since("1.2.0") (@Since("1.4.0") override val uid: String)
val est = $(estimator)
val eval = $(evaluator)
val epm = $(estimatorParamMaps)
val numModels = epm.length
val metrics = new Array[Double](epm.length)

// Create execution context based on $(parallelism)
val executionContext = getExecutionContext

val instr = Instrumentation.create(this, dataset)
instr.logParams(numFolds, seed)
instr.logParams(numFolds, seed, parallelism)
logTuningParams(instr)

// Compute metrics for each model over each split
val splits = MLUtils.kFold(dataset.toDF.rdd, $(numFolds), $(seed))
splits.zipWithIndex.foreach { case ((training, validation), splitIndex) =>
val metrics = splits.zipWithIndex.map { case ((training, validation), splitIndex) =>
val trainingDataset = sparkSession.createDataFrame(training, schema).cache()
val validationDataset = sparkSession.createDataFrame(validation, schema).cache()
// multi-model training
logDebug(s"Train split $splitIndex with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
trainingDataset.unpersist()
var i = 0
while (i < numModels) {
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(models(i).transform(validationDataset, epm(i)))
logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1

// Fit models in a Future for training in parallel
val modelFutures = epm.map { paramMap =>
Future[Model[_]] {
val model = est.fit(trainingDataset, paramMap)
model.asInstanceOf[Model[_]]
} (executionContext)
}

// Unpersist training data only when all models have trained
Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)

// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
val foldMetricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
modelFuture.map { model =>
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
}

// Wait for metrics to be calculated before unpersisting validation dataset
val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
validationDataset.unpersist()
}
f2jBLAS.dscal(numModels, 1.0 / $(numFolds), metrics, 1)
foldMetrics
}.transpose.map(_.sum / $(numFolds)) // Calculate average metric over all splits

logInfo(s"Average cross-validation metrics: ${metrics.toSeq}")
val (bestMetric, bestIndex) =
if (eval.isLargerBetter) metrics.zipWithIndex.maxBy(_._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.ml.tuning
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.language.existentials

import org.apache.hadoop.fs.Path
Expand All @@ -30,9 +32,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.evaluation.Evaluator
import org.apache.spark.ml.param.{DoubleParam, ParamMap, ParamValidators}
import org.apache.spark.ml.param.shared.HasParallelism
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils

/**
* Params for [[TrainValidationSplit]] and [[TrainValidationSplitModel]].
Expand Down Expand Up @@ -62,7 +66,7 @@ private[ml] trait TrainValidationSplitParams extends ValidatorParams {
@Since("1.5.0")
class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: String)
extends Estimator[TrainValidationSplitModel]
with TrainValidationSplitParams with MLWritable with Logging {
with TrainValidationSplitParams with HasParallelism with MLWritable with Logging {

@Since("1.5.0")
def this() = this(Identifiable.randomUID("tvs"))
Expand All @@ -87,37 +91,62 @@ class TrainValidationSplit @Since("1.5.0") (@Since("1.5.0") override val uid: St
@Since("2.0.0")
def setSeed(value: Long): this.type = set(seed, value)

/**
* Set the mamixum level of parallelism to evaluate models in parallel.
* Default is 1 for serial evaluation
*
* @group expertSetParam
*/
@Since("2.3.0")
def setParallelism(value: Int): this.type = set(parallelism, value)

@Since("2.0.0")
override def fit(dataset: Dataset[_]): TrainValidationSplitModel = {
val schema = dataset.schema
transformSchema(schema, logging = true)
val est = $(estimator)
val eval = $(evaluator)
val epm = $(estimatorParamMaps)
val numModels = epm.length
val metrics = new Array[Double](epm.length)

// Create execution context based on $(parallelism)
val executionContext = getExecutionContext

val instr = Instrumentation.create(this, dataset)
instr.logParams(trainRatio, seed)
instr.logParams(trainRatio, seed, parallelism)
logTuningParams(instr)

val Array(trainingDataset, validationDataset) =
dataset.randomSplit(Array($(trainRatio), 1 - $(trainRatio)), $(seed))
trainingDataset.cache()
validationDataset.cache()

// multi-model training
// Fit models in a Future for training in parallel
logDebug(s"Train split with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
trainingDataset.unpersist()
var i = 0
while (i < numModels) {
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(models(i).transform(validationDataset, epm(i)))
logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1
val modelFutures = epm.map { paramMap =>
Future[Model[_]] {
val model = est.fit(trainingDataset, paramMap)
model.asInstanceOf[Model[_]]
} (executionContext)
}

// Unpersist training data only when all models have trained
Future.sequence[Model[_], Iterable](modelFutures)(implicitly, executionContext)
.onComplete { _ => trainingDataset.unpersist() } (executionContext)

// Evaluate models in a Future that will calulate a metric and allow model to be cleaned up
val metricFutures = modelFutures.zip(epm).map { case (modelFuture, paramMap) =>
modelFuture.map { model =>
// TODO: duplicate evaluator to take extra params from input
val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
}

// Wait for all metrics to be calculated
val metrics = metricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))

// Unpersist validation set once all metrics have been produced
validationDataset.unpersist()

logInfo(s"Train validation split metrics: ${metrics.toSeq}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,33 @@ class CrossValidatorSuite
}
}

test("cross validation with parallel evaluation") {
val lr = new LogisticRegression
val lrParamMaps = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.001, 1000.0))
.addGrid(lr.maxIter, Array(0, 3))
.build()
val eval = new BinaryClassificationEvaluator
val cv = new CrossValidator()
.setEstimator(lr)
.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
.setNumFolds(2)
.setParallelism(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add .setSeed(XXX)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the seed param here is fixed by default and doesn't need to be set to ensure consistent results. I think that's why it's not set in the other tests in this suite. I'm not a fan of this behavior and I think it's better to explicitly set in tests, but then we should probably be consistent and set elsewhere too. What are your thoughts on this @MLnick ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah seed defaults to a hash of the class name. There has been debate over this (see SPARK-16832). Personally I also don't like that behavior, but for now that's what it is.

val cvSerialModel = cv.fit(dataset)
cv.setParallelism(2)
Copy link
Member

@felixcheung felixcheung Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we validate setParallelism is parallelizing?
I mean - can we test and validate that it is functionally working?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little difficult to do this in a unit test without making it flaky. I have run tests manually and verified it is working by both the expected speedup in timing and that the expected number of tasks are run concurrently. I can post some results if that would help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler may be worth posting the result to the JIRA for posterity.

val cvParallelModel = cv.fit(dataset)

val serialMetrics = cvSerialModel.avgMetrics.sorted
val parallelMetrics = cvParallelModel.avgMetrics.sorted
assert(serialMetrics === parallelMetrics)

val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]
val parentParallel = cvParallelModel.bestModel.parent.asInstanceOf[LogisticRegression]
assert(parentSerial.getRegParam === parentParallel.getRegParam)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to also check uid equality here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably be done in the test that already runs checkCopy on line 62 (at least until we cleanup these basic checks). I'll take a look at that.

assert(parentSerial.getMaxIter === parentParallel.getMaxIter)
}

test("read/write: CrossValidator with simple estimator") {
val lr = new LogisticRegression().setMaxIter(3)
val evaluator = new BinaryClassificationEvaluator()
Expand Down
Loading