From c5f3265a13c39c693d1fd13d46fadff89d2ab6da Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Fri, 13 Feb 2015 21:21:56 +0200 Subject: [PATCH 1/2] [Ml] SPARK-5804 Explicitly manage cache in Crossvalidator k-fold loop On a big dataset explicitly unpersist train and validation folds allows to load more data into memory in the next loop iteration. On my environment (single node 8Gb worker RAM, 2 GB dataset file, 3 folds for cross validation), saved more than 5 minutes. --- .../scala/org/apache/spark/ml/tuning/CrossValidator.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 324b1ba784387..1058b61180ff4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -77,10 +77,12 @@ class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorP val splits = MLUtils.kFold(dataset.rdd, map(numFolds), 0) splits.zipWithIndex.foreach { case ((training, validation), splitIndex) => val trainingDataset = sqlCtx.createDataFrame(training, schema).cache() - val validationDataset = sqlCtx.createDataFrame(validation, schema).cache() + val validationDataset = sqlCtx.createDataFrame(validation, schema) // multi-model training logDebug(s"Train split $splitIndex with multiple sets of parameters.") val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]] + trainingDataset.unpersist() + validationDataset.cache() var i = 0 while (i < numModels) { val metric = eval.evaluate(models(i).transform(validationDataset, epm(i)), map) @@ -88,6 +90,7 @@ class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorP metrics(i) += metric i += 1 } + validationDataset.unpersist() } f2jBLAS.dscal(numModels, 1.0 / map(numFolds), metrics, 1) logInfo(s"Average cross-validation metrics: ${metrics.toSeq}") From 66a7cfb3e119739ada90c31cf12c2f38ddc8e02d Mon Sep 17 00:00:00 2001 From: Peter Rudenko Date: Fri, 13 Feb 2015 21:34:53 +0200 Subject: [PATCH 2/2] Move validationDataset cache to declaration --- .../main/scala/org/apache/spark/ml/tuning/CrossValidator.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 1058b61180ff4..9663a636e5f9c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -77,12 +77,11 @@ class CrossValidator extends Estimator[CrossValidatorModel] with CrossValidatorP val splits = MLUtils.kFold(dataset.rdd, map(numFolds), 0) splits.zipWithIndex.foreach { case ((training, validation), splitIndex) => val trainingDataset = sqlCtx.createDataFrame(training, schema).cache() - val validationDataset = sqlCtx.createDataFrame(validation, schema) + val validationDataset = sqlCtx.createDataFrame(validation, schema).cache() // multi-model training logDebug(s"Train split $splitIndex with multiple sets of parameters.") val models = est.fit(trainingDataset, epm).asInstanceOf[Seq[Model[_]]] trainingDataset.unpersist() - validationDataset.cache() var i = 0 while (i < numModels) { val metric = eval.evaluate(models(i).transform(validationDataset, epm(i)), map)