Skip to content

Conversation

@BryanCutler
Copy link
Member

@BryanCutler BryanCutler commented Feb 2, 2017

What changes were proposed in this pull request?

Modified CrossValidator and TrainValidationSplit to be able to evaluate models in parallel for a given parameter grid. The level of parallelism is controlled by a parameter numParallelEval used to schedule a number of models to be trained/evaluated so that the jobs can be run concurrently. This is a naive approach that does not check the cluster for needed resources, so care must be taken by the user to tune the parameter appropriately. The default value is 1 which will train/evaluate in serial.

How was this patch tested?

Added unit tests for CrossValidator and TrainValidationSplit to verify that model selection is the same when run in serial vs parallel. Manual testing to verify tasks run in parallel when param is > 1. Added parameter usage to relevant examples.

@BryanCutler
Copy link
Member Author

CC @MLnick, this doesn't yet address the issue of double caching, but I wanted to post what I had so far to discuss the additional param and default value. I'll be looking into the caching next.

@SparkQA
Copy link

SparkQA commented Feb 2, 2017

Test build #72267 has finished for PR 16774 at commit 5650e98.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@MLnick MLnick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a first pass.

Let's add unit tests in both test suites to check that the metric results for default numPar are the same as for numPar > 1.

It would be good to also add a section about the parallel execution option to each validator's doc page.

I think it's important to note in the docs that this will actually only really work as expected if the FAIR scheduler is enabled, otherwise I don't think things will actually be executed concurrently.


// Evaluate models concurrently, limited by using a sliding window over models
val foldMetrics = models.zip(epm).grouped(numPar).map { win =>
win.par.map { m =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to use a case here to be more clear, so .map { case (model, paramMap) => or similar.


// Fit models concurrently, limited by using a sliding window over models
val models = epm.grouped(numPar).map { win =>
win.par.map(est.fit(trainingDataset, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to be a bit more verbose here and use a closure variable like paramMap (or epm to follow existing impl) instead of _ just be clear & more readable.

val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
// Fit models concurrently, limited by using a sliding window over models
val models = epm.grouped(numPar).map { win =>
win.par.map(est.fit(trainingDataset, _))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for cross-val applies here

}
// Evaluate models concurrently, limited by using a sliding window over models
val metrics = models.zip(epm).grouped(numPar).map { win =>
win.par.map { m =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as for cross-val applies here

// multi-model training
logDebug(s"Train split with multiple sets of parameters.")
val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]]
// Fit models concurrently, limited by using a sliding window over models
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps add "sliding window of size 'numPar' over ..." - can add to all instances of this comment

def getNumFolds: Int = $(numFolds)

setDefault(numFolds -> 3)
setDefault(numFolds -> 3, numParallelEval -> 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May make more sense to put the setDefault call in the parent trait ValidatorParams

* @group param
*/
val numParallelEval: IntParam = new IntParam(this, "numParallelEval",
"max number of models to evaluate in parallel, 1 for serial evaluation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add note about default value being 1

instr.logParams(numFolds, seed)
logTuningParams(instr)

// Compute metrics for each model over each fold
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically it's each estimator rather than model - since both the fitting (estimator or pipeline) and evaluation (model or pipeline model) is in parallel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was just referencing the general process that a metric is computed by evaluating the model predictions on the validation set, and that this is done at each fold. I don't think that has any bearing to the concurrency here, but I added this just because the process is a little more complicated with the addition of a sliding window...

@BryanCutler
Copy link
Member Author

Thanks for the review @MLnick! All good suggestions, I'll work on an update and addition of the unit test.

@BryanCutler
Copy link
Member Author

Regarding the default value of the new parameter, do you think 1, so that it is run in serial as it is currently doing, is the best option? It's definitely safe, but I wonder if most people will even know that the parameter exists..

Do you know of any possible way to maybe automatically choose a value? I remember a while ago I saw the mllib ALS algorithm automatically choose the block size, but I don't think that's done in the ml version. Maybe there were too many problems doing this..

@MLnick
Copy link
Contributor

MLnick commented Feb 15, 2017

I'd say coming up with a heuristic or algorithm to automatically set the parallel execution param is going to be pretty challenging, since it depends on the details of the individual pipeline components, cluster resources, etc. So for now let's leave it fixed.

I think setting the default to match current behavior is best (i.e. 1). We can add a section to the docs explaining its effect, and also add a couple lines to each example to illustrate typical usage.

@BryanCutler
Copy link
Member Author

I think setting the default to match current behavior is best (i.e. 1).

I agree, just wanted to bring it up in case others had differing view since it was a concern in the JIRA.

I think it's important to note in the docs that this will actually only really work as expected if the FAIR scheduler is enabled, otherwise I don't think things will actually be executed concurrently.

I ran some tests with and without the FAIR scheduler enabled. Without FAIR scheduler, if there are enough resources in the cluster (e.g. cores) then the tasks are run concurrently. If not enough resources, then the jobs will wait. With FAIR scheduler the tasks are run concurrently and share the available cores. Both will complete model selection in about the same time, with equal resources, so I think the main benefit of FAIR is for multi-users so that one users jobs don't get starved.

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #73021 has finished for PR 16774 at commit 6a9b735.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler BryanCutler changed the title [SPARK-19357][ML][WIP] Adding parallel model evaluation in ML tuning [SPARK-19357][ML] Adding parallel model evaluation in ML tuning Feb 16, 2017
@BryanCutler
Copy link
Member Author

@MLnick , I updated this PR:

  • added unit tests
  • added usage of parameter to examples
  • updated ml-tuning.md documentation
  • changed sliding window to a bounded semaphore to limit parallelism, this way as soon as one job is complete, another will get scheduled without waiting for the entire window to finish - it's a bit cleaner too

Please take a look when you get a chance, thanks!

.setNumFolds(2)
.setNumParallelEval(1)
val cvSerialModel = cv.fit(dataset)
cv.setNumParallelEval(2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is not deterministic now. @MLnick , do you know if we have some utilities to retry tests a much of times?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the models could now be evaluated in a different order, but the end result of returning the best model would be deterministic and shouldn't ever change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'd say we don't care about deterministic execution order here - we care about the result being the same regardless of execution order.

@thunterdb
Copy link
Contributor

Thanks for working on this task, this is a much requested feature. While it will work for simple cases in the current shape, it is going to cause some issues for any complex deployments (Apache Toree, Livy, Databricks, etc.) because the threadpool that controls the computations is not managed. The default assumption with .par is a lot of quick tasks. With the current implementation, because the same thread pool is going to be shared across all the parallel collections, users are going to encounter some mysterious freezes in other places, while the ML models are finishing to train (I am talking from experience here).

While the situation with .par has notably improved with scala 2.10, it is better to:

  • create a dedicated thread pool for each .fit, that users can replace.
  • use futures, of which the execution context is tied to the thread pool above.
  • not use semaphores, but instead rely on the thread limit at the thread pool level to cap the number of concurrent execution.

If you do not do that, users in a shared environment like any of the above will experience some mysterious freezes depending on what other users are doing. Ideally the default resources should be tied to SparkSession, but we can start with a default static pool marked as experimental API.

More concretely, the API should look like this, I believe:

  def setNumParallelEval(num: Int) // Creates an execution context with the given max number of threads
  def setExecutionExecutorService(exec: ExecutorService) // Will use the given executor service instead of an executor service shared by all the ML calculations

See the doc in:
https://twitter.github.io/scala_school/concurrency.html#executor
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html

More about parallel collections:
http://stackoverflow.com/questions/5424496/scala-parallel-collections-degree-of-parallelism
http://stackoverflow.com/questions/14214757/what-is-the-benefit-of-using-futures-over-parallel-collections-in-scala

@BryanCutler
Copy link
Member Author

Hi @thunterdb , thanks for the review and all of the details you provided! I agree that a configurable execution service would be needed for running under a shared environment instead of simply using the default context. I'll rework this and make use of Futures too.

@BryanCutler
Copy link
Member Author

@thunterdb and @MLnick I updated this to use a configurable ExecutorService and Futures instead of Scala parallel collections. The ExecutorService is retrieved by a function to lazily initialize the thread-pool until it is needed. When running cross-validation there is a trade-off between waiting for the all calculations to complete at each split before continuing to the next and excessively caching all of the split datasets at once. I chose to be safe and just wait for calculations at each split to complete, but let me know what you guys think - hopefully this makes sense :)

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73545 has finished for PR 16774 at commit 1c2e391.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73549 has finished for PR 16774 at commit 9e055cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@MLnick MLnick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass on these updates.

I think it overall looks pretty good. Mostly a few clean up comments, and also I think we should look at a simpler API / method for setting a custom execution context.

In terms of CrossValidator, I think for now its fine to do parallel execution within each fold. Otherwise it may be very easy to flood a cluster. We can gather user feedback from usage in the wild and always update at a later point.

method in each of these evaluators.

To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility.
Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a little clearer to say something like "By default, sets of parameters ... will be evaluated in serial. Parameter evaluation can be done in parallel by setting ..."

To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility.
Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`.
The value of `numParallelEval` should be chosen carefully to maximize parallelism without exceeding cluster resources, and will be capped at the number of cores in the driver system. Generally speaking, a value up to 10 should be sufficient for most clusters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also will need to mention that custom ExecutorService can be specified, and some detail on the default thread pool it creates (and that it is a new separate pool to avoid blocking any of the default Scala pools).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since that API is marked as experimental, maybe it would be better to not document right away until we are sure this is what we need?

method in each of these evaluators.

To help construct the parameter grid, users can use the [`ParamGridBuilder`](api/scala/index.html#org.apache.spark.ml.tuning.ParamGridBuilder) utility.
Sets of parameters from the parameter grid can be evaluated in parallel by setting `numParallelEval` with a value of 2 or more (a value of 1 will evaluate in serial) before running model selection with `CrossValidator` or `TrainValidationSplit`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a note that this only works in Scala/Java, not Python (or R)

*/
@Experimental
@InterfaceStability.Unstable
def setExecutorService(getExecutorService: (Int) => ExecutorService): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer a simpler API:

 def setExecutorService(executorService: ExecutorService): Unit = {

Not sure the function version will work nicely with Java.

I think the idea should be that setting numParallelEval will specify the number of threads for the default thread-pool, while a custom one can be set with this method (advanced usage). If set, the custom one will override.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are probably right about the Java thing, I'll test that out.

custom one can be set with this method (advanced usage). If set, the custom one will override.

That's how this works, the executor service can be overridden and set any size thread pool, it is just passed the numParallelEval param for convenience in case it wants to use it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason I did not do the simpler API that you mentioned is then it requires the ExecutorService to be instantiated with a default and then exists for the life of CrossValidator or TrainValidationSplit. The way it is currently, by default the ExecutorService is created at the beginning of fit when it is needed, and then cleaned up at the end.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see what you are getting at. Even if I override with a custom ExecutorService but leave numParallelEval to 1, it will not use the custom.. I can fix that

*
* @group param
*/
val numParallelEval: IntParam = new IntParam(this, "numParallelEval",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps set this as group expertParam for now

i += 1

// Fit models in a Future with thread-pool size determined by '$numParallelEval'
val models = epm.map { paramMap =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the implicit val above we can do:

    val models = epm.map { paramMap =>
      Future { est.fit(trainingDataset, paramMap) }.mapTo[Model[_]]
    }

} (executionContext)
}

Future.sequence[Model[_], Iterable](models)(implicitly, executionContext).onComplete { _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise:

    Future.sequence[Model[_], Iterable](models).onComplete { _ =>
      trainingDataset.unpersist()
    }

val metric = eval.evaluate(model.transform(validationDataset, paramMap))
logDebug(s"Got metric $metric for model trained with $paramMap.")
metric
} (executionContext)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can then remove the (executionContext) here and elsewhere


// Evaluate models in a Future with thread-pool size determined by '$numParallelEval'
val foldMetricFutures = models.zip(epm).map { case (modelFuture, paramMap) =>
modelFuture.flatMap { model =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we could use for comprehension here. But I tried it and it doesn't really make it all that much simpler, and the explicit flatMap and Future here makes it a bit clearer.


val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]
val parentParallel = cvParallelModel.bestModel.parent.asInstanceOf[LogisticRegression]
assert(parentSerial.getRegParam === parentParallel.getRegParam)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to also check uid equality here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably be done in the test that already runs checkCopy on line 62 (at least until we cleanup these basic checks). I'll take a look at that.

trainingDataset.unpersist()
} (executionContext)

// Evaluate models concurrently, limited by a barrier with '$numParallelEval' permits
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is stale

}

// Wait for metrics to be calculated before upersisting validation dataset
val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to also use sequence here?

    val metrics = (ThreadUtils.awaitResult(
      Future.sequence[Double, Iterable](metricFutures), Duration.Inf)).toArray

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but since it's a blocking call anyway, it will still be bound by the longest running thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, not a big deal either way

@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81046 has finished for PR 16774 at commit 658aacb.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81047 has finished for PR 16774 at commit 2c73b0b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

@MLnick @WeichenXu123 I updated to use the trait HasParallel and fixed up some of the docs, please take another look, thanks!

@BryanCutler
Copy link
Member Author

ping @MLnick , does this look ok to merge?

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a minor comment, thanks!

.setEstimatorParamMaps(lrParamMaps)
.setEvaluator(eval)
.setNumFolds(2)
.setParallelism(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add .setSeed(XXX)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the seed param here is fixed by default and doesn't need to be set to ensure consistent results. I think that's why it's not set in the other tests in this suite. I'm not a fan of this behavior and I think it's better to explicitly set in tests, but then we should probably be consistent and set elsewhere too. What are your thoughts on this @MLnick ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I agree.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah seed defaults to a hash of the class name. There has been debate over this (see SPARK-16832). Personally I also don't like that behavior, but for now that's what it is.

.setNumFolds(2)
.setParallelism(1)
val cvSerialModel = cv.fit(dataset)
cv.setParallelism(2)
Copy link
Member

@felixcheung felixcheung Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we validate setParallelism is parallelizing?
I mean - can we test and validate that it is functionally working?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little difficult to do this in a unit test without making it flaky. I have run tests manually and verified it is working by both the expected speedup in timing and that the expected number of tasks are run concurrently. I can post some results if that would help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BryanCutler may be worth posting the result to the JIRA for posterity.

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except minor comments. Thanks!

i += 1

// Fit models in a Future for training in parallel
val models = epm.map { paramMap =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think use var name modelFutures instead of models will be more clear.

}.transpose.map(_.sum)

// Calculate average metric over all splits
f2jBLAS.dscal(numModels, 1.0 / $(numFolds), metrics, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this line code because it use low-level api which make the code difficult to read.
And here is not bottleneck.
So I think we can simply use:

val metrics = ...
   ....
   .transpose.map(_.sum / $(numFolds))

instead.
What do you think about it ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must say I wondered why the author of this bothered with a BLAS call to do the computation... even if there were a few 1000s param combinations it would still not be significant overhead. +1

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. I'll go ahead and make that change here.

logDebug(s"Got metric $metric for model trained with ${epm(i)}.")
metrics(i) += metric
i += 1
val models = epm.map { paramMap =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

models ==> modelFutures

val metrics = new Array[Double](epm.length)

// Create execution context based on $(parallelism)
val executionContext = getExecutionContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the corresponding PR for PySpark implementation the number of threads is limited by the number of models to be trained (https://github.com/WeichenXu123/spark/blob/be2f3d0ec50db4730c9e3f9a813a4eb96889f5b6/python/pyspark/ml/tuning.py#L261). We might do that for instance by overriding the getParallelism method. What do you think about this?

Copy link
Contributor

@MLnick MLnick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending the minor outstanding comments from @WeichenXu123

@jkbradley
Copy link
Member

I think #19110 is ready to merge now. @BryanCutler @WeichenXu123 do you have a preference for which gets merged first?

@BryanCutler
Copy link
Member Author

If you wouldn't mind, please merge this first (once passes Jenkins). I had already updated this with the HasParallelism trait after waiting on OneVsRest for a while. Thanks!

@SparkQA
Copy link

SparkQA commented Sep 5, 2017

Test build #81417 has finished for PR 16774 at commit 7a8221b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@BryanCutler
Copy link
Member Author

This should be ready to merge @jkbradley @MLnick

@MLnick
Copy link
Contributor

MLnick commented Sep 6, 2017

LGTM. Merged this to master. Thanks @BryanCutler and everyone for reviewing!

(FYI merged this before #19110 since in this PR the doc in the trait is a little more detailed which is slightly better IMO).

@asfgit asfgit closed this in 16c4c03 Sep 6, 2017
@WeichenXu123
Copy link
Contributor

@BryanCutler @MLnick I found a bug in this PR: after save estimator (CV or TVS) and then load again, the "Parallelism" setting will be lost. But I fix this in #19208 by the way.

@jkbradley
Copy link
Member

@WeichenXu123 Thanks for finding that bug! Can you please separate out your bugfix? It's good to get fixes in, rather than attaching them to PRs which may require discussion, so that we make sure that bugs don't slip into the next release.

@BryanCutler
Copy link
Member Author

@WeichenXu123 , it would be great if you could separate out the bugfix. I looked in #19208 but couldn't find what you were referring to.

@WeichenXu123
Copy link
Contributor

OK. I will separate a PR. Thanks!

ghost pushed a commit to dbtsai/spark that referenced this pull request Dec 25, 2017
… in fitting

## What changes were proposed in this pull request?

Via some test I found CrossValidator still exists memory issue, it will still occupy `O(n*sizeof(model))` memory for holding models when fitting, if well optimized, it should be `O(parallelism*sizeof(model))`

This is because modelFutures will hold the reference to model object after future is complete (we can use `future.value.get.get` to fetch it), and the `Future.sequence` and the `modelFutures` array holds references to each model future. So all model object are keep referenced. So it will still occupy `O(n*sizeof(model))` memory.

I fix this by merging the `modelFuture` and `foldMetricFuture` together, and use `atomicInteger` to statistic complete fitting tasks and when all done, trigger `trainingDataset.unpersist`.

I ever commented this issue on the old PR [SPARK-19357]
apache#16774 (review)
unfortunately, at that time I do not realize that the issue still exists, but now I confirm it and create this PR to fix it.

## Discussion
I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off.

**After discussion with jkbradley , choose approach 3**

### Approach 1
~~The approach proposed by MrBago at~~ apache#19904 (comment)
~~This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. **BUT, in some cases, it still do not resolve the O(N) model memory occupation issue**. Let me use an extreme case to describe it:~~
~~suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`, the code have to wait 100 fitting tasks complete, **(at this time the memory occupation by models already reach 100 * sizeof(model) )** and then it will unpersist training dataset and then do 100 evaluation tasks.~~

### Approach 2
~~This approach is my PR old version code~~ apache@2cc7c28
~~This approach can make sure at any case, the peak memory occupation by models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case
 `parallelism = 1`, and there're 100 fitting & evaluation tasks, each fitting&evaluation task have to be executed one by one, so only after the first 99 fitting&evaluation tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered.~~

### Approach 3
After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals:
- Make the peak memory occupation by models(driver-side) to be O(parallelism * sizeof(model))
- unpersist training dataset before most of the evaluation tasks started.

So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?)
Because the goal 1 is more important, we must make sure the peak memory occupation by models (driver-side) to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM.
Like following code:
```
      val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) =>
        Future[Double] {
          val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]]
          //...other minor codes
          val metric = eval.evaluate(model.transform(validationDataset, paramMap))
          logDebug(s"Got metric metricformodeltrainedwithparamMap.")
          metric
        } (executionContext)
      }
      val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf))
      trainingDataset.unpersist() // <------- unpersist at the end
      validationDataset.unpersist()
```

## How was this patch tested?

N/A

Author: WeichenXu <[email protected]>

Closes apache#19904 from WeichenXu123/fix_cross_validator_memory_issue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants