From c745198a9765eee982f3f1a29fc3620b55559d17 Mon Sep 17 00:00:00 2001 From: Louiszr Date: Sat, 15 Aug 2020 18:19:12 +0100 Subject: [PATCH 01/11] Added tests for SPARK-32092 --- python/pyspark/ml/tests/test_tuning.py | 49 ++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 66f1ea20a4b25..c088e8bba49cb 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -89,15 +89,34 @@ def test_copy(self): grid = (ParamGridBuilder() .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) .build()) - cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + cv = CrossValidator( + estimator=iee, + estimatorParamMaps=grid, + evaluator=evaluator, + numFolds=2 + ) cvCopied = cv.copy() - self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid) + for param in [ + lambda x: x.getEstimator().uid, + lambda x: x.getNumFolds(), + lambda x: x.getFoldCol(), + lambda x: x.getCollectSubModels(), + lambda x: x.getParallelism(), + lambda x: x.getSeed() + ]: + self.assertEqual(param(cv), param(cvCopied)) cvModel = cv.fit(dataset) cvModelCopied = cvModel.copy() for index in range(len(cvModel.avgMetrics)): self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index]) < 0.0001) + for param in [ + lambda x: x.getNumFolds(), + lambda x: x.getFoldCol(), + lambda x: x.getSeed() + ]: + self.assertEqual(param(cvModel), param(cvModelCopied)) def test_fit_minimize_metric(self): dataset = self.spark.createDataFrame([ @@ -166,16 +185,34 @@ def test_save_load_trained_model(self): lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() evaluator = BinaryClassificationEvaluator() - cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + cv = CrossValidator( + estimator=lr, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True, + numFolds=4 + ) cvModel = cv.fit(dataset) lrModel = cvModel.bestModel - cvModelPath = temp_path + "/cvModel" - lrModel.save(cvModelPath) - loadedLrModel = LogisticRegressionModel.load(cvModelPath) + lrModelPath = temp_path + "/lrModel" + lrModel.save(lrModelPath) + loadedLrModel = LogisticRegressionModel.load(lrModelPath) self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + cvModelPath = temp_path + "/cvModel" + cvModel.save(cvModelPath) + loadedCvModel = CrossValidatorModel.load(cvModelPath) + for param in [ + lambda x: x.getNumFolds(), + lambda x: x.getFoldCol(), + lambda x: x.getSeed(), + lambda x: len(x.subModels) + ]: + self.assertEqual(param(cvModel), param(loadedCvModel)) + + def test_save_load_simple_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( From 202ffcf33601976e54bde4b5e710be620ea0a1a6 Mon Sep 17 00:00:00 2001 From: Louiszr Date: Sat, 15 Aug 2020 18:47:41 +0100 Subject: [PATCH 02/11] Fixed copy() --- python/pyspark/ml/tuning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index d7800e0c9020e..6b2f0df34c006 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -536,7 +536,7 @@ def copy(self, extra=None): bestModel = self.bestModel.copy(extra) avgMetrics = self.avgMetrics subModels = self.subModels - return CrossValidatorModel(bestModel, avgMetrics, subModels) + return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels), extra=extra) @since("2.3.0") def write(self): From fe5837d544c791e8434e2c310162abbf273c5160 Mon Sep 17 00:00:00 2001 From: Louiszr Date: Sat, 15 Aug 2020 20:15:47 +0100 Subject: [PATCH 03/11] Fixed to_java and from_java --- python/pyspark/ml/tuning.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 6b2f0df34c006..a2a3fa87ddb9e 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -560,8 +560,17 @@ def _from_java(cls, java_stage): avgMetrics = _java2py(sc, java_stage.avgMetrics()) estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage) - py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)._set(estimator=estimator) - py_stage = py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator) + py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "numFolds": java_stage.getNumFolds(), + "foldCol": java_stage.getFoldCol(), + "seed": java_stage.getSeed(), + } + for param_name, param_val in params.items(): + py_stage = py_stage._set(**{param_name: param_val}) if java_stage.hasSubModels(): py_stage.subModels = [[JavaParams._from_java(sub_model) @@ -585,9 +594,16 @@ def _to_java(self): _py2java(sc, self.avgMetrics)) estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl() - _java_obj.set("evaluator", evaluator) - _java_obj.set("estimator", estimator) - _java_obj.set("estimatorParamMaps", epms) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "numFolds": self.getNumFolds(), + "foldCol": self.getFoldCol(), + "seed": self.getSeed(), + } + for param_name, param_val in params.items(): + _java_obj.set(param_name, param_val) if self.subModels is not None: java_sub_models = [[sub_model._to_java() for sub_model in fold_sub_models] From c3a8407354722873f5e14037ff135755f677e32d Mon Sep 17 00:00:00 2001 From: Louiszr Date: Sat, 15 Aug 2020 20:51:49 +0100 Subject: [PATCH 04/11] Added comments for JIRA reference --- python/pyspark/ml/tests/test_tuning.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index c088e8bba49cb..61401d6eee92b 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -98,6 +98,7 @@ def test_copy(self): cvCopied = cv.copy() for param in [ lambda x: x.getEstimator().uid, + # SPARK-32092: CrossValidator.copy() needs to copy all existing params lambda x: x.getNumFolds(), lambda x: x.getFoldCol(), lambda x: x.getCollectSubModels(), @@ -111,6 +112,7 @@ def test_copy(self): for index in range(len(cvModel.avgMetrics)): self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index]) < 0.0001) + # SPARK-32092: CrossValidator.copy() needs to copy all existing params for param in [ lambda x: x.getNumFolds(), lambda x: x.getFoldCol(), @@ -201,6 +203,7 @@ def test_save_load_trained_model(self): self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + # SPARK-32092: Saving and then loading CrossValidatorModel should not change the params cvModelPath = temp_path + "/cvModel" cvModel.save(cvModelPath) loadedCvModel = CrossValidatorModel.load(cvModelPath) From 850d30723034cb03f7e9fc35b6380fd1a18514de Mon Sep 17 00:00:00 2001 From: Louiszr Date: Sun, 16 Aug 2020 14:39:01 +0100 Subject: [PATCH 05/11] Style changes --- python/pyspark/ml/tests/test_tuning.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 61401d6eee92b..051b319e78507 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -189,7 +189,7 @@ def test_save_load_trained_model(self): evaluator = BinaryClassificationEvaluator() cv = CrossValidator( estimator=lr, - estimatorParamMaps=grid, + estimatorParamMaps=grid, evaluator=evaluator, collectSubModels=True, numFolds=4 @@ -215,7 +215,6 @@ def test_save_load_trained_model(self): ]: self.assertEqual(param(cvModel), param(loadedCvModel)) - def test_save_load_simple_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( From a7a9163ccb0d30c508422cd7c709dde2e7508444 Mon Sep 17 00:00:00 2001 From: Louiszr Date: Mon, 17 Aug 2020 22:28:40 +0100 Subject: [PATCH 06/11] Updated tests --- python/pyspark/ml/tests/test_tuning.py | 70 ++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 051b319e78507..779333244f92c 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -93,6 +93,7 @@ def test_copy(self): estimator=iee, estimatorParamMaps=grid, evaluator=evaluator, + collectSubModels=True, numFolds=2 ) cvCopied = cv.copy() @@ -119,6 +120,19 @@ def test_copy(self): lambda x: x.getSeed() ]: self.assertEqual(param(cvModel), param(cvModelCopied)) + + cvModel.avgMetrics[0] = 'foo' + self.assertNotEqual( + cvModelCopied.avgMetrics[0], + 'foo', + "Changing the original avgMetrics should not affect the copied model" + ) + cvModel.subModels[0] = 'foo' + self.assertNotEqual( + cvModelCopied.subModels[0], + 'foo', + "Changing the original subModels should not affect the copied model" + ) def test_fit_minimize_metric(self): dataset = self.spark.createDataFrame([ @@ -562,16 +576,30 @@ def test_save_load_trained_model(self): lr = LogisticRegression() grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() evaluator = BinaryClassificationEvaluator() - tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + tvs = TrainValidationSplit( + estimator=lr, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True + ) tvsModel = tvs.fit(dataset) lrModel = tvsModel.bestModel - tvsModelPath = temp_path + "/tvsModel" - lrModel.save(tvsModelPath) - loadedLrModel = LogisticRegressionModel.load(tvsModelPath) + lrModelPath = temp_path + "/lrModel" + lrModel.save(lrModelPath) + loadedLrModel = LogisticRegressionModel.load(lrModelPath) self.assertEqual(loadedLrModel.uid, lrModel.uid) self.assertEqual(loadedLrModel.intercept, lrModel.intercept) + tvsModelPath = temp_path + "/tvsModel" + tvsModel.save(tvsModelPath) + loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath) + for param in [ + lambda x: x.getSeed(), + lambda x: x.getTrainRatio(), + ]: + self.assertEqual(param(tvsModel), param(loadedTvsModel)) + def test_save_load_simple_estimator(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 @@ -773,11 +801,30 @@ def test_copy(self): grid = ParamGridBuilder() \ .addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \ .build() - tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator) + tvs = TrainValidationSplit( + estimator=iee, + estimatorParamMaps=grid, + evaluator=evaluator, + collectSubModels=True + ) tvsModel = tvs.fit(dataset) tvsCopied = tvs.copy() tvsModelCopied = tvsModel.copy() + for param in [ + lambda x: x.getCollectSubModels(), + lambda x: x.getParallelism(), + lambda x: x.getSeed(), + lambda x: x.getTrainRatio(), + ]: + self.assertEqual(param(tvs), param(tvsCopied)) + + for param in [ + lambda x: x.getSeed(), + lambda x: x.getTrainRatio(), + ]: + self.assertEqual(param(tvsModel), param(tvsModelCopied)) + self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid, "Copied TrainValidationSplit has the same uid of Estimator") @@ -789,6 +836,19 @@ def test_copy(self): self.assertEqual(tvsModel.validationMetrics[index], tvsModelCopied.validationMetrics[index]) + tvsModel.validationMetrics[0] = 'foo' + self.assertNotEqual( + tvsModelCopied.validationMetrics[0], + 'foo', + "Changing the original validationMetrics should not affect the copied model" + ) + tvsModel.subModels[0] = 'foo' + self.assertNotEqual( + tvsModelCopied.subModels[0], + 'foo', + "Changing the original subModels should not affect the copied model" + ) + if __name__ == "__main__": from pyspark.ml.tests.test_tuning import * # noqa: F401 From b8311614147ddc0f0ee11e8ae668d4e405590a2e Mon Sep 17 00:00:00 2001 From: Louiszr Date: Mon, 17 Aug 2020 23:21:53 +0100 Subject: [PATCH 07/11] Deeper copy and fixed tvs --- python/pyspark/ml/tuning.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index a2a3fa87ddb9e..b443c4ad099b1 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -534,8 +534,8 @@ def copy(self, extra=None): if extra is None: extra = dict() bestModel = self.bestModel.copy(extra) - avgMetrics = self.avgMetrics - subModels = self.subModels + avgMetrics = list(self.avgMetrics) + subModels = [model.copy() for model in self.subModels] return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels), extra=extra) @since("2.3.0") @@ -890,8 +890,8 @@ def copy(self, extra=None): extra = dict() bestModel = self.bestModel.copy(extra) validationMetrics = list(self.validationMetrics) - subModels = self.subModels - return TrainValidationSplitModel(bestModel, validationMetrics, subModels) + subModels = [model.copy() for model in self.subModels] + return self._copyValues(TrainValidationSplitModel(bestModel, validationMetrics, subModels), extra=extra) @since("2.3.0") def write(self): @@ -919,8 +919,16 @@ def _from_java(cls, java_stage): cls)._from_java_impl(java_stage) # Create a new instance of this stage. py_stage = cls(bestModel=bestModel, - validationMetrics=validationMetrics)._set(estimator=estimator) - py_stage = py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator) + validationMetrics=validationMetrics) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "trainRatio": java_stage.getTrainRatio(), + "seed": java_stage.getSeed(), + } + for param_name, param_val in params.items(): + py_stage = py_stage._set(**{param_name: param_val}) if java_stage.hasSubModels(): py_stage.subModels = [JavaParams._from_java(sub_model) @@ -943,9 +951,15 @@ def _to_java(self): _py2java(sc, self.validationMetrics)) estimator, epms, evaluator = super(TrainValidationSplitModel, self)._to_java_impl() - _java_obj.set("evaluator", evaluator) - _java_obj.set("estimator", estimator) - _java_obj.set("estimatorParamMaps", epms) + params = { + "evaluator": evaluator, + "estimator": estimator, + "estimatorParamMaps": epms, + "trainRatio": self.getTrainRatio(), + "seed": self.getSeed(), + } + for param_name, param_val in params.items(): + _java_obj.set(param_name, param_val) if self.subModels is not None: java_sub_models = [sub_model._to_java() for sub_model in self.subModels] From e7d79be32c93d2e917fa3416ef744091e753bebc Mon Sep 17 00:00:00 2001 From: Louiszr Date: Mon, 17 Aug 2020 23:25:05 +0100 Subject: [PATCH 08/11] Linting --- python/pyspark/ml/tests/test_tuning.py | 2 +- python/pyspark/ml/tuning.py | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 779333244f92c..98220bc80d4a7 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -120,7 +120,7 @@ def test_copy(self): lambda x: x.getSeed() ]: self.assertEqual(param(cvModel), param(cvModelCopied)) - + cvModel.avgMetrics[0] = 'foo' self.assertNotEqual( cvModelCopied.avgMetrics[0], diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index b443c4ad099b1..f1fefb5cb0aab 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -891,7 +891,10 @@ def copy(self, extra=None): bestModel = self.bestModel.copy(extra) validationMetrics = list(self.validationMetrics) subModels = [model.copy() for model in self.subModels] - return self._copyValues(TrainValidationSplitModel(bestModel, validationMetrics, subModels), extra=extra) + return self._copyValues( + TrainValidationSplitModel(bestModel, validationMetrics, subModels), + extra=extra + ) @since("2.3.0") def write(self): From ba994fdf92215b65e8615e0e21c6e6b6fabd8e5e Mon Sep 17 00:00:00 2001 From: Louiszr Date: Wed, 19 Aug 2020 20:46:53 +0100 Subject: [PATCH 09/11] Used param pairs for type conversion --- python/pyspark/ml/tests/test_tuning.py | 6 ++++-- python/pyspark/ml/tuning.py | 8 ++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index 98220bc80d4a7..f61a7a9644b9a 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -206,7 +206,8 @@ def test_save_load_trained_model(self): estimatorParamMaps=grid, evaluator=evaluator, collectSubModels=True, - numFolds=4 + numFolds=4, + seed=42 ) cvModel = cv.fit(dataset) lrModel = cvModel.bestModel @@ -580,7 +581,8 @@ def test_save_load_trained_model(self): estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, - collectSubModels=True + collectSubModels=True, + seed=42 ) tvsModel = tvs.fit(dataset) lrModel = tvsModel.bestModel diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index f1fefb5cb0aab..466491d046692 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -603,7 +603,9 @@ def _to_java(self): "seed": self.getSeed(), } for param_name, param_val in params.items(): - _java_obj.set(param_name, param_val) + java_param = _java_obj.getParam(param_name) + pair = java_param.w(param_val) + _java_obj.set(pair) if self.subModels is not None: java_sub_models = [[sub_model._to_java() for sub_model in fold_sub_models] @@ -962,7 +964,9 @@ def _to_java(self): "seed": self.getSeed(), } for param_name, param_val in params.items(): - _java_obj.set(param_name, param_val) + java_param = _java_obj.getParam(param_name) + pair = java_param.w(param_val) + _java_obj.set(pair) if self.subModels is not None: java_sub_models = [sub_model._to_java() for sub_model in self.subModels] From fcfac36177c73b7a645219bf8f5914ea5e11bccc Mon Sep 17 00:00:00 2001 From: Louiszr Date: Wed, 19 Aug 2020 22:47:29 +0100 Subject: [PATCH 10/11] Made save load tests more robust to new params --- python/pyspark/ml/tests/test_tuning.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index f61a7a9644b9a..c4d518cc473bf 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -230,6 +230,10 @@ def test_save_load_trained_model(self): ]: self.assertEqual(param(cvModel), param(loadedCvModel)) + self.assertTrue(all( + loadedCvModel.isSet(param) for param in loadedCvModel.params + )) + def test_save_load_simple_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( @@ -602,6 +606,10 @@ def test_save_load_trained_model(self): ]: self.assertEqual(param(tvsModel), param(loadedTvsModel)) + self.assertTrue(all( + loadedTvsModel.isSet(param) for param in loadedTvsModel.params + )) + def test_save_load_simple_estimator(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 From 8ae74d000ac48d6e293feb4bc3dac31088bf6c6c Mon Sep 17 00:00:00 2001 From: Louiszr Date: Thu, 20 Aug 2020 22:15:16 +0100 Subject: [PATCH 11/11] Fix typo --- python/pyspark/ml/tests/test_tuning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tests/test_tuning.py b/python/pyspark/ml/tests/test_tuning.py index c4d518cc473bf..0937e4707eab1 100644 --- a/python/pyspark/ml/tests/test_tuning.py +++ b/python/pyspark/ml/tests/test_tuning.py @@ -113,7 +113,7 @@ def test_copy(self): for index in range(len(cvModel.avgMetrics)): self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index]) < 0.0001) - # SPARK-32092: CrossValidator.copy() needs to copy all existing params + # SPARK-32092: CrossValidatorModel.copy() needs to copy all existing params for param in [ lambda x: x.getNumFolds(), lambda x: x.getFoldCol(),