From 1edd66b0bc1c0afa28db14c0ca667dc7dd463880 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 1 Nov 2017 20:21:04 +0800 Subject: [PATCH 1/5] init pr --- .../ml/param/_shared_params_code_gen.py | 4 +- python/pyspark/ml/param/shared.py | 24 +++++++ python/pyspark/ml/tests.py | 42 ++++++++++++ python/pyspark/ml/tuning.py | 66 ++++++++++++------- shared.py | 0 5 files changed, 113 insertions(+), 23 deletions(-) create mode 100644 shared.py diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 130d1a0bae7f..0baf3269baf8 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -154,7 +154,9 @@ def get$Name(self): ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", "TypeConverters.toInt"), ("parallelism", "the number of threads to use when running parallel algorithms (>= 1).", - "1", "TypeConverters.toInt")] + "1", "TypeConverters.toInt"), + ("collectSubModels", "whether to collect a list of sub-models trained during tuning", + "False", "TypeConverters.toBoolean")] code = [] for name, doc, defaultValueStr, typeConverter in shared: diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 4041d9c43b23..822018a614fe 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -632,6 +632,30 @@ def getParallelism(self): return self.getOrDefault(self.parallelism) +class HasCollectSubModels(Params): + """ + Mixin for param collectSubModels: whether to collect a list of sub-models trained during tuning + """ + + collectSubModels = Param(Params._dummy(), "collectSubModels", "whether to collect a list of sub-models trained during tuning", typeConverter=TypeConverters.toBoolean) + + def __init__(self): + super(HasCollectSubModels, self).__init__() + self._setDefault(collectSubModels=False) + + def setCollectSubModels(self, value): + """ + Sets the value of :py:attr:`collectSubModels`. + """ + return self._set(collectSubModels=value) + + def getCollectSubModels(self): + """ + Gets the value of collectSubModels or its default value. + """ + return self.getOrDefault(self.collectSubModels) + + class DecisionTreeParams(Params): """ Mixin for Decision Tree parameters. diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 2f1f3af957e4..813b35bb3b12 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -857,6 +857,29 @@ def test_parallel_evaluation(self): cvParallelModel = cv.fit(dataset) self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics) + def test_expose_sub_models(self): + temp_path = tempfile.mkdtemp() + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + evaluator = BinaryClassificationEvaluator() + + numFolds = 3 + cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, + numFolds=numFolds, collectSubModels=True) + cvModel = cv.fit(dataset) + subModels = cvModel.subModels + assert len(subModels) == numFolds + for i in range(numFolds): + assert len(subModels[i]) == 2 + def test_save_load_nested_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( @@ -1025,6 +1048,25 @@ def test_parallel_evaluation(self): tvsParallelModel = tvs.fit(dataset) self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics) + def test_expose_sub_models(self): + temp_path = tempfile.mkdtemp() + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + evaluator = BinaryClassificationEvaluator() + tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, + collectSubModels=True) + tvsModel = tvs.fit(dataset) + subModels = tvsModel.subModels + assert len(subModels) == 2 + + def test_save_load_nested_estimator(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 47351133524e..40299ce4f831 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -22,7 +22,7 @@ from pyspark.ml import Estimator, Model from pyspark.ml.common import _py2java from pyspark.ml.param import Params, Param, TypeConverters -from pyspark.ml.param.shared import HasParallelism, HasSeed +from pyspark.ml.param.shared import HasCollectSubModels, HasParallelism, HasSeed from pyspark.ml.util import * from pyspark.ml.wrapper import JavaParams from pyspark.sql.functions import rand @@ -170,7 +170,7 @@ def _to_java_impl(self): return java_estimator, java_epms, java_evaluator -class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable): +class CrossValidator(Estimator, ValidatorParams, HasParallelism, HasCollectSubModels, MLReadable, MLWritable): """ K-fold cross validation performs model selection by splitting the dataset into a set of @@ -209,10 +209,10 @@ class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLW @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None, parallelism=1): + seed=None, parallelism=1, collectSubModels=False): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None, parallelism=1) + seed=None, parallelism=1, collectSubModels=False) """ super(CrossValidator, self).__init__() self._setDefault(numFolds=3, parallelism=1) @@ -222,10 +222,10 @@ def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numF @keyword_only @since("1.4.0") def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None, parallelism=1): + seed=None, parallelism=1, collectSubModels=False): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None, parallelism=1): + seed=None, parallelism=1, collectSubModels=False): Sets params for cross validator. """ kwargs = self._input_kwargs @@ -258,6 +258,10 @@ def _fit(self, dataset): metrics = [0.0] * numModels pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + subModels = None + collectSubModelsParam = self.getCollectSubModels() + if (collectSubModelsParam == True): + subModels = [[None for j in range(numModels)] for i in range(nFolds)] for i in range(nFolds): validateLB = i * h @@ -266,13 +270,16 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - def singleTrain(paramMap): + def singleTrain(paramMapIndex): + paramMap = epm[paramMapIndex] model = est.fit(train, paramMap) + if (collectSubModelsParam == True): + subModels[i][paramMapIndex] = model # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, paramMap)) return metric - currentFoldMetrics = pool.map(singleTrain, epm) + currentFoldMetrics = pool.map(singleTrain, range(numModels)) for j in range(numModels): metrics[j] += (currentFoldMetrics[j] / nFolds) validation.unpersist() @@ -283,7 +290,7 @@ def singleTrain(paramMap): else: bestIndex = np.argmin(metrics) bestModel = est.fit(dataset, epm[bestIndex]) - return self._copyValues(CrossValidatorModel(bestModel, metrics)) + return self._copyValues(CrossValidatorModel(bestModel, metrics, subModels)) @since("1.4.0") def copy(self, extra=None): @@ -363,13 +370,15 @@ class CrossValidatorModel(Model, ValidatorParams, MLReadable, MLWritable): .. versionadded:: 1.4.0 """ - def __init__(self, bestModel, avgMetrics=[]): + def __init__(self, bestModel, avgMetrics=[], subModels=None): super(CrossValidatorModel, self).__init__() #: best model from cross validation self.bestModel = bestModel #: Average cross-validation metrics for each paramMap in #: CrossValidator.estimatorParamMaps, in the corresponding order. self.avgMetrics = avgMetrics + #: sub model list from cross validation + self.subModels=subModels def _transform(self, dataset): return self.bestModel.transform(dataset) @@ -389,7 +398,8 @@ def copy(self, extra=None): extra = dict() bestModel = self.bestModel.copy(extra) avgMetrics = self.avgMetrics - return CrossValidatorModel(bestModel, avgMetrics) + subModels = self.subModels + return CrossValidatorModel(bestModel, avgMetrics, subModels) @since("2.3.0") def write(self): @@ -439,7 +449,8 @@ def _to_java(self): return _java_obj -class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable): +class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, HasCollectSubModels, + MLReadable, MLWritable): """ .. note:: Experimental @@ -474,10 +485,10 @@ class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadabl @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, - parallelism=1, seed=None): + parallelism=1, collectSubModels=False, seed=None): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ - parallelism=1, seed=None) + parallelism=1, collectSubModels=False, seed=None) """ super(TrainValidationSplit, self).__init__() self._setDefault(trainRatio=0.75, parallelism=1) @@ -487,10 +498,10 @@ def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trai @since("2.0.0") @keyword_only def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, - parallelism=1, seed=None): + parallelism=1, collectSubModels=False, seed=None): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ - parallelism=1, seed=None): + parallelism=1, collectSubModels=False, seed=None): Sets params for the train validation split. """ kwargs = self._input_kwargs @@ -523,13 +534,21 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - def singleTrain(paramMap): + subModels = None + collectSubModelsParam = self.getCollectSubModels() + if (collectSubModelsParam == True): + subModels = [None for i in range(numModels)] + + def singleTrain(paramMapIndex): + paramMap = epm[paramMapIndex] model = est.fit(train, paramMap) + if (collectSubModelsParam): + subModels[paramMapIndex] = model metric = eva.evaluate(model.transform(validation, paramMap)) return metric pool = ThreadPool(processes=min(self.getParallelism(), numModels)) - metrics = pool.map(singleTrain, epm) + metrics = pool.map(singleTrain, range(numModels)) train.unpersist() validation.unpersist() @@ -538,7 +557,7 @@ def singleTrain(paramMap): else: bestIndex = np.argmin(metrics) bestModel = est.fit(dataset, epm[bestIndex]) - return self._copyValues(TrainValidationSplitModel(bestModel, metrics)) + return self._copyValues(TrainValidationSplitModel(bestModel, metrics, subModels)) @since("2.0.0") def copy(self, extra=None): @@ -617,12 +636,14 @@ class TrainValidationSplitModel(Model, ValidatorParams, MLReadable, MLWritable): .. versionadded:: 2.0.0 """ - def __init__(self, bestModel, validationMetrics=[]): + def __init__(self, bestModel, validationMetrics=[], subModels=None): super(TrainValidationSplitModel, self).__init__() - #: best model from cross validation + #: best model from train validation split self.bestModel = bestModel #: evaluated validation metrics self.validationMetrics = validationMetrics + #: sub models from train validation split + self.subModels = subModels def _transform(self, dataset): return self.bestModel.transform(dataset) @@ -643,7 +664,8 @@ def copy(self, extra=None): extra = dict() bestModel = self.bestModel.copy(extra) validationMetrics = list(self.validationMetrics) - return TrainValidationSplitModel(bestModel, validationMetrics) + subModels = self.subModels + return TrainValidationSplitModel(bestModel, validationMetrics, subModels) @since("2.3.0") def write(self): diff --git a/shared.py b/shared.py new file mode 100644 index 000000000000..e69de29bb2d1 From 9e27f6b8e3b28fbde022b944f5089d5bb0fdd02c Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 17 Nov 2017 11:38:03 +0800 Subject: [PATCH 2/5] add submodels save load support --- .../spark/ml/tuning/CrossValidator.scala | 11 +++++ .../ml/tuning/TrainValidationSplit.scala | 11 +++++ python/pyspark/ml/tests.py | 44 ++++++++++++++++--- python/pyspark/ml/tuning.py | 21 ++++++++- python/pyspark/ml/util.py | 4 ++ 5 files changed, 83 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala index 1682ca91bf83..c9138d3cd9e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala @@ -279,6 +279,17 @@ class CrossValidatorModel private[ml] ( this } + // A Python-friendly auxiliary method + private[tuning] def setSubModels(subModels: JList[JList[Model[_]]]) + : CrossValidatorModel = { + _subModels = if (subModels != null) { + Some(subModels.asScala.toArray.map(_.asScala.toArray)) + } else { + None + } + this + } + /** * @return submodels represented in two dimension array. The index of outer array is the * fold index, and the index of inner array corresponds to the ordering of diff --git a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala index c73bd1847547..8385c5a80a36 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala @@ -271,6 +271,17 @@ class TrainValidationSplitModel private[ml] ( this } + // A Python-friendly auxiliary method + private[tuning] def setSubModels(subModels: JList[Model[_]]) + : TrainValidationSplitModel = { + _subModels = if (subModels != null) { + Some(subModels.asScala.toArray) + } else { + None + } + this + } + /** * @return submodels represented in array. The index of array corresponds to the ordering of * estimatorParamMaps diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 813b35bb3b12..e13ae9e667bf 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -874,11 +874,30 @@ def test_expose_sub_models(self): numFolds = 3 cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, numFolds=numFolds, collectSubModels=True) + + def checkSubModels(subModels): + assert len(subModels) == numFolds + for i in range(numFolds): + assert len(subModels[i]) == len(grid) + cvModel = cv.fit(dataset) - subModels = cvModel.subModels - assert len(subModels) == numFolds + checkSubModels(cvModel.subModels) + + # Test the default value for option "persistSubModel" to be "true" + testSubPath = temp_path + "/testCrossValidatorSubModels" + savingPathWithSubModels = testSubPath + "cvModel3" + cvModel.save(savingPathWithSubModels) + cvModel3 = CrossValidatorModel.load(savingPathWithSubModels) + checkSubModels(cvModel3.subModels) + + savingPathWithoutSubModels = testSubPath + "cvModel2" + cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels) + cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels) + assert cvModel2.subModels is None + for i in range(numFolds): - assert len(subModels[i]) == 2 + for j in range(len(grid)): + self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid) def test_save_load_nested_estimator(self): temp_path = tempfile.mkdtemp() @@ -1063,9 +1082,22 @@ def test_expose_sub_models(self): tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, collectSubModels=True) tvsModel = tvs.fit(dataset) - subModels = tvsModel.subModels - assert len(subModels) == 2 - + assert len(tvsModel.subModels) == len(grid) + + # Test the default value for option "persistSubModel" to be "true" + testSubPath = temp_path + "/testTrainValidationSplitSubModels" + savingPathWithSubModels = testSubPath + "cvModel3" + tvsModel.save(savingPathWithSubModels) + tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels) + assert len(tvsModel3.subModels) == len(grid) + + savingPathWithoutSubModels = testSubPath + "cvModel2" + tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels) + tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels) + assert tvsModel2.subModels is None + + for i in range(len(grid)): + self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid) def test_save_load_nested_estimator(self): # This tests saving and loading the trained model only. diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 40299ce4f831..df45a3d1435b 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -418,13 +418,17 @@ def _from_java(cls, java_stage): Given a Java CrossValidatorModel, create and return a Python wrapper of it. Used for ML persistence. """ - bestModel = JavaParams._from_java(java_stage.bestModel()) estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage) py_stage = cls(bestModel=bestModel).setEstimator(estimator) py_stage = py_stage.setEstimatorParamMaps(epms).setEvaluator(evaluator) + if java_stage.hasSubModels(): + py_stage.subModels = [[JavaParams._from_java(sub_model) + for sub_model in fold_sub_models] + for fold_sub_models in java_stage.subModels()] + py_stage._resetUid(java_stage.uid()) return py_stage @@ -446,6 +450,11 @@ def _to_java(self): _java_obj.set("evaluator", evaluator) _java_obj.set("estimator", estimator) _java_obj.set("estimatorParamMaps", epms) + + if self.subModels is not None: + java_sub_models = [[sub_model._to_java() for sub_model in fold_sub_models] + for fold_sub_models in self.subModels] + _java_obj.setSubModels(java_sub_models) return _java_obj @@ -623,7 +632,6 @@ def _to_java(self): _java_obj.setTrainRatio(self.getTrainRatio()) _java_obj.setSeed(self.getSeed()) _java_obj.setParallelism(self.getParallelism()) - return _java_obj @@ -693,6 +701,10 @@ def _from_java(cls, java_stage): py_stage = cls(bestModel=bestModel).setEstimator(estimator) py_stage = py_stage.setEstimatorParamMaps(epms).setEvaluator(evaluator) + if java_stage.hasSubModels(): + py_stage.subModels = [JavaParams._from_java(sub_model) + for sub_model in java_stage.subModels()] + py_stage._resetUid(java_stage.uid()) return py_stage @@ -714,6 +726,11 @@ def _to_java(self): _java_obj.set("evaluator", evaluator) _java_obj.set("estimator", estimator) _java_obj.set("estimatorParamMaps", epms) + + if self.subModels is not None: + java_sub_models = [sub_model._to_java() for sub_model in self.subModels] + _java_obj.setSubModels(java_sub_models) + return _java_obj diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py index c3c47bd79459..a486c6a3fdeb 100644 --- a/python/pyspark/ml/util.py +++ b/python/pyspark/ml/util.py @@ -169,6 +169,10 @@ def overwrite(self): self._jwrite.overwrite() return self + def option(self, key, value): + self._jwrite.option(key, value) + return self + def context(self, sqlContext): """ Sets the SQL context to use for saving. From 758bc24e8328e8b496b28c7cd8b8183458f18953 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Sun, 19 Nov 2017 10:36:08 +0800 Subject: [PATCH 3/5] fix_RAT_check --- shared.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 shared.py diff --git a/shared.py b/shared.py deleted file mode 100644 index e69de29bb2d1..000000000000 From ae082f564ff2c23c976201ccf91a7dcd6726e4c9 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Sun, 19 Nov 2017 10:44:51 +0800 Subject: [PATCH 4/5] fix python style --- python/pyspark/ml/tuning.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index df45a3d1435b..9ac23625ab3b 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -170,7 +170,8 @@ def _to_java_impl(self): return java_estimator, java_epms, java_evaluator -class CrossValidator(Estimator, ValidatorParams, HasParallelism, HasCollectSubModels, MLReadable, MLWritable): +class CrossValidator(Estimator, ValidatorParams, HasParallelism, HasCollectSubModels, + MLReadable, MLWritable): """ K-fold cross validation performs model selection by splitting the dataset into a set of @@ -260,7 +261,7 @@ def _fit(self, dataset): pool = ThreadPool(processes=min(self.getParallelism(), numModels)) subModels = None collectSubModelsParam = self.getCollectSubModels() - if (collectSubModelsParam == True): + if collectSubModelsParam: subModels = [[None for j in range(numModels)] for i in range(nFolds)] for i in range(nFolds): @@ -273,7 +274,7 @@ def _fit(self, dataset): def singleTrain(paramMapIndex): paramMap = epm[paramMapIndex] model = est.fit(train, paramMap) - if (collectSubModelsParam == True): + if collectSubModelsParam: subModels[i][paramMapIndex] = model # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, paramMap)) @@ -378,7 +379,7 @@ def __init__(self, bestModel, avgMetrics=[], subModels=None): #: CrossValidator.estimatorParamMaps, in the corresponding order. self.avgMetrics = avgMetrics #: sub model list from cross validation - self.subModels=subModels + self.subModels = subModels def _transform(self, dataset): return self.bestModel.transform(dataset) @@ -545,13 +546,13 @@ def _fit(self, dataset): subModels = None collectSubModelsParam = self.getCollectSubModels() - if (collectSubModelsParam == True): + if collectSubModelsParam: subModels = [None for i in range(numModels)] def singleTrain(paramMapIndex): paramMap = epm[paramMapIndex] model = est.fit(train, paramMap) - if (collectSubModelsParam): + if collectSubModelsParam: subModels[paramMapIndex] = model metric = eva.evaluate(model.transform(validation, paramMap)) return metric From 80f07fb93a00e2cda402d312e5c6e915bb400c12 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 13 Apr 2018 18:35:51 +0800 Subject: [PATCH 5/5] address comments --- .../pyspark/ml/param/_shared_params_code_gen.py | 5 ++++- python/pyspark/ml/param/shared.py | 4 ++-- python/pyspark/ml/tests.py | 16 ++++++++++------ python/pyspark/ml/tuning.py | 12 ++++++++++-- 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 78734a0aac19..6e9e0a34cdfd 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -157,7 +157,10 @@ def get$Name(self): "TypeConverters.toInt"), ("parallelism", "the number of threads to use when running parallel algorithms (>= 1).", "1", "TypeConverters.toInt"), - ("collectSubModels", "whether to collect a list of sub-models trained during tuning", + ("collectSubModels", "Param for whether to collect a list of sub-models trained during " + + "tuning. If set to false, then only the single best sub-model will be available after " + + "fitting. If set to true, then all sub-models will be available. Warning: For large " + + "models, collecting all sub-models can cause OOMs on the Spark driver.", "False", "TypeConverters.toBoolean"), ("loss", "the loss function to be optimized.", None, "TypeConverters.toString")] diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 9e082f089c28..08408ee8fbfc 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -657,10 +657,10 @@ def getParallelism(self): class HasCollectSubModels(Params): """ - Mixin for param collectSubModels: whether to collect a list of sub-models trained during tuning + Mixin for param collectSubModels: Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver. """ - collectSubModels = Param(Params._dummy(), "collectSubModels", "whether to collect a list of sub-models trained during tuning", typeConverter=TypeConverters.toBoolean) + collectSubModels = Param(Params._dummy(), "collectSubModels", "Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.", typeConverter=TypeConverters.toBoolean) def __init__(self): super(HasCollectSubModels, self).__init__() diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 2f6d79bc11b0..2ec0be60e9fa 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1037,9 +1037,9 @@ def test_expose_sub_models(self): numFolds=numFolds, collectSubModels=True) def checkSubModels(subModels): - assert len(subModels) == numFolds + self.assertEqual(len(subModels), numFolds) for i in range(numFolds): - assert len(subModels[i]) == len(grid) + self.assertEqual(len(subModels[i]), len(grid)) cvModel = cv.fit(dataset) checkSubModels(cvModel.subModels) @@ -1050,11 +1050,13 @@ def checkSubModels(subModels): cvModel.save(savingPathWithSubModels) cvModel3 = CrossValidatorModel.load(savingPathWithSubModels) checkSubModels(cvModel3.subModels) + cvModel4 = cvModel3.copy() + checkSubModels(cvModel4.subModels) savingPathWithoutSubModels = testSubPath + "cvModel2" cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels) cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels) - assert cvModel2.subModels is None + self.assertEqual(cvModel2.subModels, None) for i in range(numFolds): for j in range(len(grid)): @@ -1243,19 +1245,21 @@ def test_expose_sub_models(self): tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, collectSubModels=True) tvsModel = tvs.fit(dataset) - assert len(tvsModel.subModels) == len(grid) + self.assertEqual(len(tvsModel.subModels), len(grid)) # Test the default value for option "persistSubModel" to be "true" testSubPath = temp_path + "/testTrainValidationSplitSubModels" savingPathWithSubModels = testSubPath + "cvModel3" tvsModel.save(savingPathWithSubModels) tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels) - assert len(tvsModel3.subModels) == len(grid) + self.assertEqual(len(tvsModel3.subModels), len(grid)) + tvsModel4 = tvsModel3.copy() + self.assertEqual(len(tvsModel4.subModels), len(grid)) savingPathWithoutSubModels = testSubPath + "cvModel2" tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels) tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels) - assert tvsModel2.subModels is None + self.assertEqual(tvsModel2.subModels, None) for i in range(len(grid)): self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index ef086ff6b64c..0c8029f293cf 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -354,9 +354,11 @@ def _from_java(cls, java_stage): numFolds = java_stage.getNumFolds() seed = java_stage.getSeed() parallelism = java_stage.getParallelism() + collectSubModels = java_stage.getCollectSubModels() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - numFolds=numFolds, seed=seed, parallelism=parallelism) + numFolds=numFolds, seed=seed, parallelism=parallelism, + collectSubModels=collectSubModels) py_stage._resetUid(java_stage.uid()) return py_stage @@ -376,6 +378,7 @@ def _to_java(self): _java_obj.setSeed(self.getSeed()) _java_obj.setNumFolds(self.getNumFolds()) _java_obj.setParallelism(self.getParallelism()) + _java_obj.setCollectSubModels(self.getCollectSubModels()) return _java_obj @@ -410,6 +413,7 @@ def copy(self, extra=None): and some extra params. This copies the underlying bestModel, creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. + It does not copy the extra Params into the subModels. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance @@ -628,9 +632,11 @@ def _from_java(cls, java_stage): trainRatio = java_stage.getTrainRatio() seed = java_stage.getSeed() parallelism = java_stage.getParallelism() + collectSubModels = java_stage.getCollectSubModels() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - trainRatio=trainRatio, seed=seed, parallelism=parallelism) + trainRatio=trainRatio, seed=seed, parallelism=parallelism, + collectSubModels=collectSubModels) py_stage._resetUid(java_stage.uid()) return py_stage @@ -650,6 +656,7 @@ def _to_java(self): _java_obj.setTrainRatio(self.getTrainRatio()) _java_obj.setSeed(self.getSeed()) _java_obj.setParallelism(self.getParallelism()) + _java_obj.setCollectSubModels(self.getCollectSubModels()) return _java_obj @@ -682,6 +689,7 @@ def copy(self, extra=None): creates a deep copy of the embedded paramMap, and copies the embedded and extra parameters over. And, this creates a shallow copy of the validationMetrics. + It does not copy the extra Params into the subModels. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance