Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ class CrossValidatorModel private[ml] (
this
}

// A Python-friendly auxiliary method
private[tuning] def setSubModels(subModels: JList[JList[Model[_]]])
: CrossValidatorModel = {
_subModels = if (subModels != null) {
Some(subModels.asScala.toArray.map(_.asScala.toArray))
} else {
None
}
this
}

/**
* @return submodels represented in two dimension array. The index of outer array is the
* fold index, and the index of inner array corresponds to the ordering of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ class TrainValidationSplitModel private[ml] (
this
}

// A Python-friendly auxiliary method
private[tuning] def setSubModels(subModels: JList[Model[_]])
: TrainValidationSplitModel = {
_subModels = if (subModels != null) {
Some(subModels.asScala.toArray)
} else {
None
}
this
}

/**
* @return submodels represented in array. The index of array corresponds to the ordering of
* estimatorParamMaps
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/ml/param/_shared_params_code_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def get$Name(self):
"TypeConverters.toInt"),
("parallelism", "the number of threads to use when running parallel algorithms (>= 1).",
"1", "TypeConverters.toInt"),
("collectSubModels", "Param for whether to collect a list of sub-models trained during " +
"tuning. If set to false, then only the single best sub-model will be available after " +
"fitting. If set to true, then all sub-models will be available. Warning: For large " +
"models, collecting all sub-models can cause OOMs on the Spark driver.",
"False", "TypeConverters.toBoolean"),
("loss", "the loss function to be optimized.", None, "TypeConverters.toString")]

code = []
Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/ml/param/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,30 @@ def getParallelism(self):
return self.getOrDefault(self.parallelism)


class HasCollectSubModels(Params):
"""
Mixin for param collectSubModels: Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.
"""

collectSubModels = Param(Params._dummy(), "collectSubModels", "Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.", typeConverter=TypeConverters.toBoolean)

def __init__(self):
super(HasCollectSubModels, self).__init__()
self._setDefault(collectSubModels=False)

def setCollectSubModels(self, value):
"""
Sets the value of :py:attr:`collectSubModels`.
"""
return self._set(collectSubModels=value)

def getCollectSubModels(self):
"""
Gets the value of collectSubModels or its default value.
"""
return self.getOrDefault(self.collectSubModels)


class HasLoss(Params):
"""
Mixin for param loss: the loss function to be optimized.
Expand Down
78 changes: 78 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,50 @@ def test_parallel_evaluation(self):
cvParallelModel = cv.fit(dataset)
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)

def test_expose_sub_models(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests. Can you make one addition: Test the copy() method to make sure it copies the submodels.

temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])

lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()

numFolds = 3
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
numFolds=numFolds, collectSubModels=True)

def checkSubModels(subModels):
self.assertEqual(len(subModels), numFolds)
for i in range(numFolds):
self.assertEqual(len(subModels[i]), len(grid))

cvModel = cv.fit(dataset)
checkSubModels(cvModel.subModels)

# Test the default value for option "persistSubModel" to be "true"
testSubPath = temp_path + "/testCrossValidatorSubModels"
savingPathWithSubModels = testSubPath + "cvModel3"
cvModel.save(savingPathWithSubModels)
cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
checkSubModels(cvModel3.subModels)
cvModel4 = cvModel3.copy()
checkSubModels(cvModel4.subModels)

savingPathWithoutSubModels = testSubPath + "cvModel2"
cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
self.assertEqual(cvModel2.subModels, None)

for i in range(numFolds):
for j in range(len(grid)):
self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)

def test_save_load_nested_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
Expand Down Expand Up @@ -1186,6 +1230,40 @@ def test_parallel_evaluation(self):
tvsParallelModel = tvs.fit(dataset)
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)

def test_expose_sub_models(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
collectSubModels=True)
tvsModel = tvs.fit(dataset)
self.assertEqual(len(tvsModel.subModels), len(grid))

# Test the default value for option "persistSubModel" to be "true"
testSubPath = temp_path + "/testTrainValidationSplitSubModels"
savingPathWithSubModels = testSubPath + "cvModel3"
tvsModel.save(savingPathWithSubModels)
tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
self.assertEqual(len(tvsModel3.subModels), len(grid))
tvsModel4 = tvsModel3.copy()
self.assertEqual(len(tvsModel4.subModels), len(grid))

savingPathWithoutSubModels = testSubPath + "cvModel2"
tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
self.assertEqual(tvsModel2.subModels, None)

for i in range(len(grid)):
self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)

def test_save_load_nested_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
Expand Down
Loading