Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ class CrossValidatorSuite
cv.setParallelism(2)
val cvParallelModel = cv.fit(dataset)

val serialMetrics = cvSerialModel.avgMetrics.sorted
val parallelMetrics = cvParallelModel.avgMetrics.sorted
val serialMetrics = cvSerialModel.avgMetrics
val parallelMetrics = cvParallelModel.avgMetrics
assert(serialMetrics === parallelMetrics)

val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ class TrainValidationSplitSuite
cv.setParallelism(2)
val cvParallelModel = cv.fit(dataset)

val serialMetrics = cvSerialModel.validationMetrics.sorted
val parallelMetrics = cvParallelModel.validationMetrics.sorted
val serialMetrics = cvSerialModel.validationMetrics
val parallelMetrics = cvParallelModel.validationMetrics
assert(serialMetrics === parallelMetrics)

val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression]
Expand Down
39 changes: 39 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,27 @@ def test_save_load_simple_estimator(self):
loadedModel = CrossValidatorModel.load(cvModelPath)
self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid)

def test_parallel_evaluation(self):
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])

lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
evaluator = BinaryClassificationEvaluator()

# test save/load of CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
cv.setParallelism(1)
cvSerialModel = cv.fit(dataset)
cv.setParallelism(2)
cvParallelModel = cv.fit(dataset)
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)

def test_save_load_nested_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
Expand Down Expand Up @@ -986,6 +1007,24 @@ def test_save_load_simple_estimator(self):
loadedModel = TrainValidationSplitModel.load(tvsModelPath)
self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid)

def test_parallel_evaluation(self):
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
tvs.setParallelism(1)
tvsSerialModel = tvs.fit(dataset)
tvs.setParallelism(2)
tvsParallelModel = tvs.fit(dataset)
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)

def test_save_load_nested_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
Expand Down
86 changes: 53 additions & 33 deletions python/pyspark/ml/tuning.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

import itertools
import numpy as np
from multiprocessing.pool import ThreadPool

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
from pyspark.ml.common import _py2java
from pyspark.ml.param import Params, Param, TypeConverters
from pyspark.ml.param.shared import HasSeed
from pyspark.ml.param.shared import HasParallelism, HasSeed
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaParams
from pyspark.sql.functions import rand
Expand Down Expand Up @@ -170,7 +170,7 @@ def _to_java_impl(self):
return java_estimator, java_epms, java_evaluator


class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
"""

K-fold cross validation performs model selection by splitting the dataset into a set of
Expand All @@ -193,7 +193,8 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):
>>> lr = LogisticRegression()
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
>>> evaluator = BinaryClassificationEvaluator()
>>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
>>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
... parallelism=2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you planning on adding a unit test to verify that parallel has the same results as serial?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test added.

>>> cvModel = cv.fit(dataset)
>>> cvModel.avgMetrics[0]
0.5
Expand All @@ -208,23 +209,23 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable):

@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
seed=None):
seed=None, parallelism=1):
"""
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
seed=None)
seed=None, parallelism=1)
"""
super(CrossValidator, self).__init__()
self._setDefault(numFolds=3)
self._setDefault(numFolds=3, parallelism=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the param is not being passed to Java, should we check that it is >=1 here and in setParam?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add check when creating thread pool.

kwargs = self._input_kwargs
self._set(**kwargs)

@keyword_only
@since("1.4.0")
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,
seed=None):
seed=None, parallelism=1):
"""
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\
seed=None):
seed=None, parallelism=1):
Sets params for cross validator.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -255,18 +256,27 @@ def _fit(self, dataset):
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
metrics = [0.0] * numModels

pool = ThreadPool(processes=min(self.getParallelism(), numModels))

for i in range(nFolds):
validateLB = i * h
validateUB = (i + 1) * h
condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB)
validation = df.filter(condition)
train = df.filter(~condition)
models = est.fit(train, epm)
for j in range(numModels):
model = models[j]
validation = df.filter(condition).cache()
train = df.filter(~condition).cache()

def singleTrain(paramMap):
model = est.fit(train, paramMap)
# TODO: duplicate evaluator to take extra params from input
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric/nFolds
metric = eva.evaluate(model.transform(validation, paramMap))
return metric

currentFoldMetrics = pool.map(singleTrain, epm)
for j in range(numModels):
metrics[j] += (currentFoldMetrics[j] / nFolds)
validation.unpersist()
train.unpersist()

if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
Expand Down Expand Up @@ -316,9 +326,10 @@ def _from_java(cls, java_stage):
estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage)
numFolds = java_stage.getNumFolds()
seed = java_stage.getSeed()
parallelism = java_stage.getParallelism()
# Create a new instance of this stage.
py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
numFolds=numFolds, seed=seed)
numFolds=numFolds, seed=seed, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage

Expand All @@ -337,6 +348,7 @@ def _to_java(self):
_java_obj.setEstimator(estimator)
_java_obj.setSeed(self.getSeed())
_java_obj.setNumFolds(self.getNumFolds())
_java_obj.setParallelism(self.getParallelism())

return _java_obj

Expand Down Expand Up @@ -427,7 +439,7 @@ def _to_java(self):
return _java_obj


class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable):
"""
.. note:: Experimental

Expand All @@ -448,7 +460,8 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):
>>> lr = LogisticRegression()
>>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
>>> evaluator = BinaryClassificationEvaluator()
>>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)
>>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
... parallelism=2)
>>> tvsModel = tvs.fit(dataset)
>>> evaluator.evaluate(tvsModel.transform(dataset))
0.8333...
Expand All @@ -461,23 +474,23 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable):

@keyword_only
def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
seed=None):
parallelism=1, seed=None):
"""
__init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
seed=None)
parallelism=1, seed=None)
"""
super(TrainValidationSplit, self).__init__()
self._setDefault(trainRatio=0.75)
self._setDefault(trainRatio=0.75, parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)

@since("2.0.0")
@keyword_only
def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,
seed=None):
parallelism=1, seed=None):
"""
setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\
seed=None):
parallelism=1, seed=None):
Sets params for the train validation split.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -506,15 +519,20 @@ def _fit(self, dataset):
seed = self.getOrDefault(self.seed)
randCol = self.uid + "_rand"
df = dataset.select("*", rand(seed).alias(randCol))
metrics = [0.0] * numModels
condition = (df[randCol] >= tRatio)
validation = df.filter(condition)
train = df.filter(~condition)
models = est.fit(train, epm)
for j in range(numModels):
model = models[j]
metric = eva.evaluate(model.transform(validation, epm[j]))
metrics[j] += metric
validation = df.filter(condition).cache()
train = df.filter(~condition).cache()

def singleTrain(paramMap):
model = est.fit(train, paramMap)
metric = eva.evaluate(model.transform(validation, paramMap))
return metric

pool = ThreadPool(processes=min(self.getParallelism(), numModels))
metrics = pool.map(singleTrain, epm)
train.unpersist()
validation.unpersist()

if eva.isLargerBetter():
bestIndex = np.argmax(metrics)
else:
Expand Down Expand Up @@ -563,9 +581,10 @@ def _from_java(cls, java_stage):
estimator, epms, evaluator = super(TrainValidationSplit, cls)._from_java_impl(java_stage)
trainRatio = java_stage.getTrainRatio()
seed = java_stage.getSeed()
parallelism = java_stage.getParallelism()
# Create a new instance of this stage.
py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator,
trainRatio=trainRatio, seed=seed)
trainRatio=trainRatio, seed=seed, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage

Expand All @@ -584,6 +603,7 @@ def _to_java(self):
_java_obj.setEstimator(estimator)
_java_obj.setTrainRatio(self.getTrainRatio())
_java_obj.setSeed(self.getSeed())
_java_obj.setParallelism(self.getParallelism())

return _java_obj

Expand Down