Skip to content
131 changes: 120 additions & 11 deletions python/pyspark/ml/tests/test_tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,50 @@ def test_copy(self):
grid = (ParamGridBuilder()
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0])
.build())
cv = CrossValidator(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
cv = CrossValidator(
estimator=iee,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True,
numFolds=2
)
cvCopied = cv.copy()
self.assertEqual(cv.getEstimator().uid, cvCopied.getEstimator().uid)
for param in [
lambda x: x.getEstimator().uid,
# SPARK-32092: CrossValidator.copy() needs to copy all existing params
lambda x: x.getNumFolds(),
lambda x: x.getFoldCol(),
lambda x: x.getCollectSubModels(),
lambda x: x.getParallelism(),
lambda x: x.getSeed()
]:
self.assertEqual(param(cv), param(cvCopied))

cvModel = cv.fit(dataset)
cvModelCopied = cvModel.copy()
for index in range(len(cvModel.avgMetrics)):
self.assertTrue(abs(cvModel.avgMetrics[index] - cvModelCopied.avgMetrics[index])
< 0.0001)
# SPARK-32092: CrossValidatorModel.copy() needs to copy all existing params
for param in [
lambda x: x.getNumFolds(),
lambda x: x.getFoldCol(),
lambda x: x.getSeed()
]:
self.assertEqual(param(cvModel), param(cvModelCopied))

cvModel.avgMetrics[0] = 'foo'
self.assertNotEqual(
cvModelCopied.avgMetrics[0],
'foo',
"Changing the original avgMetrics should not affect the copied model"
)
cvModel.subModels[0] = 'foo'
self.assertNotEqual(
cvModelCopied.subModels[0],
'foo',
"Changing the original subModels should not affect the copied model"
)

def test_fit_minimize_metric(self):
dataset = self.spark.createDataFrame([
Expand Down Expand Up @@ -166,16 +201,39 @@ def test_save_load_trained_model(self):
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
cv = CrossValidator(
estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True,
numFolds=4,
seed=42
)
cvModel = cv.fit(dataset)
lrModel = cvModel.bestModel

cvModelPath = temp_path + "/cvModel"
lrModel.save(cvModelPath)
loadedLrModel = LogisticRegressionModel.load(cvModelPath)
lrModelPath = temp_path + "/lrModel"
lrModel.save(lrModelPath)
loadedLrModel = LogisticRegressionModel.load(lrModelPath)
self.assertEqual(loadedLrModel.uid, lrModel.uid)
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)

# SPARK-32092: Saving and then loading CrossValidatorModel should not change the params
cvModelPath = temp_path + "/cvModel"
cvModel.save(cvModelPath)
loadedCvModel = CrossValidatorModel.load(cvModelPath)
for param in [
lambda x: x.getNumFolds(),
lambda x: x.getFoldCol(),
lambda x: x.getSeed(),
lambda x: len(x.subModels)
]:
self.assertEqual(param(cvModel), param(loadedCvModel))

self.assertTrue(all(
loadedCvModel.isSet(param) for param in loadedCvModel.params
))

def test_save_load_simple_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
Expand Down Expand Up @@ -523,16 +581,35 @@ def test_save_load_trained_model(self):
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
tvs = TrainValidationSplit(
estimator=lr,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True,
seed=42
)
tvsModel = tvs.fit(dataset)
lrModel = tvsModel.bestModel

tvsModelPath = temp_path + "/tvsModel"
lrModel.save(tvsModelPath)
loadedLrModel = LogisticRegressionModel.load(tvsModelPath)
lrModelPath = temp_path + "/lrModel"
lrModel.save(lrModelPath)
loadedLrModel = LogisticRegressionModel.load(lrModelPath)
self.assertEqual(loadedLrModel.uid, lrModel.uid)
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)

tvsModelPath = temp_path + "/tvsModel"
tvsModel.save(tvsModelPath)
loadedTvsModel = TrainValidationSplitModel.load(tvsModelPath)
for param in [
lambda x: x.getSeed(),
lambda x: x.getTrainRatio(),
]:
self.assertEqual(param(tvsModel), param(loadedTvsModel))

self.assertTrue(all(
loadedTvsModel.isSet(param) for param in loadedTvsModel.params
))

def test_save_load_simple_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
Expand Down Expand Up @@ -734,11 +811,30 @@ def test_copy(self):
grid = ParamGridBuilder() \
.addGrid(iee.inducedError, [100.0, 0.0, 10000.0]) \
.build()
tvs = TrainValidationSplit(estimator=iee, estimatorParamMaps=grid, evaluator=evaluator)
tvs = TrainValidationSplit(
estimator=iee,
estimatorParamMaps=grid,
evaluator=evaluator,
collectSubModels=True
)
tvsModel = tvs.fit(dataset)
tvsCopied = tvs.copy()
tvsModelCopied = tvsModel.copy()

for param in [
lambda x: x.getCollectSubModels(),
lambda x: x.getParallelism(),
lambda x: x.getSeed(),
lambda x: x.getTrainRatio(),
]:
self.assertEqual(param(tvs), param(tvsCopied))

for param in [
lambda x: x.getSeed(),
lambda x: x.getTrainRatio(),
]:
self.assertEqual(param(tvsModel), param(tvsModelCopied))

self.assertEqual(tvs.getEstimator().uid, tvsCopied.getEstimator().uid,
"Copied TrainValidationSplit has the same uid of Estimator")

Expand All @@ -750,6 +846,19 @@ def test_copy(self):
self.assertEqual(tvsModel.validationMetrics[index],
tvsModelCopied.validationMetrics[index])

tvsModel.validationMetrics[0] = 'foo'
self.assertNotEqual(
tvsModelCopied.validationMetrics[0],
'foo',
"Changing the original validationMetrics should not affect the copied model"
)
tvsModel.subModels[0] = 'foo'
self.assertNotEqual(
tvsModelCopied.subModels[0],
'foo',
"Changing the original subModels should not affect the copied model"
)


if __name__ == "__main__":
from pyspark.ml.tests.test_tuning import * # noqa: F401
Expand Down
67 changes: 52 additions & 15 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,9 +534,9 @@ def copy(self, extra=None):
if extra is None:
extra = dict()
bestModel = self.bestModel.copy(extra)
avgMetrics = self.avgMetrics
subModels = self.subModels
return CrossValidatorModel(bestModel, avgMetrics, subModels)
avgMetrics = list(self.avgMetrics)
subModels = [model.copy() for model in self.subModels]
return self._copyValues(CrossValidatorModel(bestModel, avgMetrics, subModels), extra=extra)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use Params.copy like CrossValidator?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can. However I just found one potential issue with using Params.copy (not specific to CrossValidator). It creates a shallow copy of self (i.e. the models). Hence if we run the below snippet

cvModelCopied = cvModel.copy()
cvModel.avgMetrics[0] = 'foo'
assert cvModelCopied.avgMetrics[0] != 'foo'  # This will fail

Based on the Scala equivalent I think avgMetrics should be shallow copied and subModels should be copied with the copying actions delegated to copy() of each model. I will push a change to this function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You meant avgMetrics should be or should not be shallow copied?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avgMetrics should be shallow copied, as tested in https://github.com/Louiszr/spark/blob/8ae74d000ac48d6e293feb4bc3dac31088bf6c6c/python/pyspark/ml/tests/test_tuning.py#L124-L129
I think Params.copy will shallow copy the CrossValidatorModel object and thus only copy the reference to avgMetrics

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? I think the test makes sure it isn't shallow copy but deep copy, isn't?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By shallow copy I mean copy.copy() in python, which makes re-assigning cvModel.avgMetrics[0] not being propagated to cvModelCopied.avgMetrics[0].
I am also using copy.deepcopy() as the reference for deep copy. If cvModel.avgMetrics[0] is an class instance, then shallow copy will point to the same instance, while deep copy will create a copy of the instance.
I think here it doesn't make a difference because avgMetrics is a list of float, but in the future if it does become a list of objects then a shallow copy implementation will be sufficient to pass the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get your point above. You meant if we just shallow copy the model itself, reassigning of element in avgMetrics will be propagated to the avgMetrics in copied model. Because two models use the same avgMetrics reference.

You want to shallow copy avgMetrics itself. So two models have difference avgMetrics references. Because avgMetrics is a list of float, it is no matter shallow copy or deep copy.

By shallow copy I mean copy.copy() in python, which makes re-assigning cvModel.avgMetrics[0] not being propagated to cvModelCopied.avgMetrics[0].

No matter deep copy or shallow copy, I think reassigning avgMetrics[0] won't propagate to the avgMetrics[0] of copied model. Shallow copy copies object references, reassigning changes references, so won't propagate. Deep copy copies object instance, reassigning changes references too, of course won't propagate either.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed with the above.


@since("2.3.0")
def write(self):
Expand All @@ -560,8 +560,17 @@ def _from_java(cls, java_stage):
avgMetrics = _java2py(sc, java_stage.avgMetrics())
estimator, epms, evaluator = super(CrossValidatorModel, cls)._from_java_impl(java_stage)

py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)._set(estimator=estimator)
py_stage = py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator)
py_stage = cls(bestModel=bestModel, avgMetrics=avgMetrics)
params = {
"evaluator": evaluator,
"estimator": estimator,
"estimatorParamMaps": epms,
"numFolds": java_stage.getNumFolds(),
"foldCol": java_stage.getFoldCol(),
"seed": java_stage.getSeed(),
}
for param_name, param_val in params.items():
py_stage = py_stage._set(**{param_name: param_val})

if java_stage.hasSubModels():
py_stage.subModels = [[JavaParams._from_java(sub_model)
Expand All @@ -585,9 +594,18 @@ def _to_java(self):
_py2java(sc, self.avgMetrics))
estimator, epms, evaluator = super(CrossValidatorModel, self)._to_java_impl()

_java_obj.set("evaluator", evaluator)
_java_obj.set("estimator", estimator)
_java_obj.set("estimatorParamMaps", epms)
params = {
"evaluator": evaluator,
"estimator": estimator,
"estimatorParamMaps": epms,
"numFolds": self.getNumFolds(),
"foldCol": self.getFoldCol(),
"seed": self.getSeed(),
}
for param_name, param_val in params.items():
java_param = _java_obj.getParam(param_name)
pair = java_param.w(param_val)
_java_obj.set(pair)

if self.subModels is not None:
java_sub_models = [[sub_model._to_java() for sub_model in fold_sub_models]
Expand Down Expand Up @@ -874,8 +892,11 @@ def copy(self, extra=None):
extra = dict()
bestModel = self.bestModel.copy(extra)
validationMetrics = list(self.validationMetrics)
subModels = self.subModels
return TrainValidationSplitModel(bestModel, validationMetrics, subModels)
subModels = [model.copy() for model in self.subModels]
return self._copyValues(
TrainValidationSplitModel(bestModel, validationMetrics, subModels),
extra=extra
)

@since("2.3.0")
def write(self):
Expand Down Expand Up @@ -903,8 +924,16 @@ def _from_java(cls, java_stage):
cls)._from_java_impl(java_stage)
# Create a new instance of this stage.
py_stage = cls(bestModel=bestModel,
validationMetrics=validationMetrics)._set(estimator=estimator)
py_stage = py_stage._set(estimatorParamMaps=epms)._set(evaluator=evaluator)
validationMetrics=validationMetrics)
params = {
"evaluator": evaluator,
"estimator": estimator,
"estimatorParamMaps": epms,
"trainRatio": java_stage.getTrainRatio(),
"seed": java_stage.getSeed(),
}
for param_name, param_val in params.items():
py_stage = py_stage._set(**{param_name: param_val})

if java_stage.hasSubModels():
py_stage.subModels = [JavaParams._from_java(sub_model)
Expand All @@ -927,9 +956,17 @@ def _to_java(self):
_py2java(sc, self.validationMetrics))
estimator, epms, evaluator = super(TrainValidationSplitModel, self)._to_java_impl()

_java_obj.set("evaluator", evaluator)
_java_obj.set("estimator", estimator)
_java_obj.set("estimatorParamMaps", epms)
params = {
"evaluator": evaluator,
"estimator": estimator,
"estimatorParamMaps": epms,
"trainRatio": self.getTrainRatio(),
"seed": self.getSeed(),
}
for param_name, param_val in params.items():
java_param = _java_obj.getParam(param_name)
pair = java_param.w(param_val)
_java_obj.set(pair)

if self.subModels is not None:
java_sub_models = [sub_model._to_java() for sub_model in self.subModels]
Expand Down