From 44f43325e4510e32241a09a28b8c9d24ef831a38 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 5 Sep 2017 00:03:55 +0800 Subject: [PATCH 01/10] init pr --- python/pyspark/ml/tuning.py | 70 ++++++++++++++++++++++--------------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 00c348aa9f7d..352020b2a412 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -14,15 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # - import itertools import numpy as np +from multiprocessing.pool import ThreadPool + from pyspark import since, keyword_only from pyspark.ml import Estimator, Model from pyspark.ml.common import _py2java from pyspark.ml.param import Params, Param, TypeConverters -from pyspark.ml.param.shared import HasSeed +from pyspark.ml.param.shared import HasSeed, HasParallelism from pyspark.ml.util import * from pyspark.ml.wrapper import JavaParams from pyspark.sql.functions import rand @@ -170,7 +171,7 @@ def _to_java_impl(self): return java_estimator, java_epms, java_evaluator -class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): +class CrossValidator(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable): """ K-fold cross validation performs model selection by splitting the dataset into a set of @@ -193,7 +194,8 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() - >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + >>> cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, + ... parallelism=2) >>> cvModel = cv.fit(dataset) >>> cvModel.avgMetrics[0] 0.5 @@ -208,23 +210,23 @@ class CrossValidator(Estimator, ValidatorParams, MLReadable, MLWritable): @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None): + seed=None, parallelism=1): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None) + seed=None, parallelism=1) """ super(CrossValidator, self).__init__() - self._setDefault(numFolds=3) + self._setDefault(numFolds=3, parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) @keyword_only @since("1.4.0") def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3, - seed=None): + seed=None, parallelism=1): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, numFolds=3,\ - seed=None): + seed=None, parallelism=1): Sets params for cross validator. """ kwargs = self._input_kwargs @@ -255,18 +257,23 @@ def _fit(self, dataset): randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels + + pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + for i in range(nFolds): validateLB = i * h validateUB = (i + 1) * h condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) - validation = df.filter(condition) + validation = df.filter(condition).cache() train = df.filter(~condition) - models = est.fit(train, epm) - for j in range(numModels): - model = models[j] + + def singleTrain(index): + model = est.fit(train, epm[index]) # TODO: duplicate evaluator to take extra params from input - metric = eva.evaluate(model.transform(validation, epm[j])) - metrics[j] += metric/nFolds + metric = eva.evaluate(model.transform(validation, epm[index])) + metrics[index] += metric/nFolds + + pool.map(singleTrain, range(numModels)) if eva.isLargerBetter(): bestIndex = np.argmax(metrics) @@ -427,7 +434,7 @@ def _to_java(self): return _java_obj -class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): +class TrainValidationSplit(Estimator, ValidatorParams, HasParallelism, MLReadable, MLWritable): """ .. note:: Experimental @@ -448,7 +455,8 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): >>> lr = LogisticRegression() >>> grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() >>> evaluator = BinaryClassificationEvaluator() - >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + >>> tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator, + ... parallelism=2) >>> tvsModel = tvs.fit(dataset) >>> evaluator.evaluate(tvsModel.transform(dataset)) 0.8333... @@ -461,23 +469,23 @@ class TrainValidationSplit(Estimator, ValidatorParams, MLReadable, MLWritable): @keyword_only def __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, - seed=None): + parallelism=1, seed=None): """ __init__(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ - seed=None) + parallelism=1, seed=None) """ super(TrainValidationSplit, self).__init__() - self._setDefault(trainRatio=0.75) + self._setDefault(trainRatio=0.75, parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) @since("2.0.0") @keyword_only def setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75, - seed=None): + parallelism=1, seed=None): """ setParams(self, estimator=None, estimatorParamMaps=None, evaluator=None, trainRatio=0.75,\ - seed=None): + parallelism=1, seed=None): Sets params for the train validation split. """ kwargs = self._input_kwargs @@ -508,13 +516,17 @@ def _fit(self, dataset): df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels condition = (df[randCol] >= tRatio) - validation = df.filter(condition) - train = df.filter(~condition) - models = est.fit(train, epm) - for j in range(numModels): - model = models[j] - metric = eva.evaluate(model.transform(validation, epm[j])) - metrics[j] += metric + validation = df.filter(condition).cache() + train = df.filter(~condition).cache() + + def singleTrain(index): + model = est.fit(train, epm[index]) + metric = eva.evaluate(model.transform(validation, epm[index])) + metrics[index] += metric + + pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + pool.map(singleTrain, range(numModels)) + if eva.isLargerBetter(): bestIndex = np.argmax(metrics) else: From b321534485101e6708adf07d125d5e2fb4edad15 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 5 Sep 2017 00:26:42 +0800 Subject: [PATCH 02/10] update --- python/pyspark/ml/tuning.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 352020b2a412..af4b8c0abd15 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -274,6 +274,7 @@ def singleTrain(index): metrics[index] += metric/nFolds pool.map(singleTrain, range(numModels)) + validation.unpersist() if eva.isLargerBetter(): bestIndex = np.argmax(metrics) @@ -526,6 +527,8 @@ def singleTrain(index): pool = ThreadPool(processes=min(self.getParallelism(), numModels)) pool.map(singleTrain, range(numModels)) + train.unpersist() + validation.unpersist() if eva.isLargerBetter(): bestIndex = np.argmax(metrics) From d5209c4a576444b513e266def5b0dcff0eb6a8b2 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 5 Sep 2017 22:45:18 +0800 Subject: [PATCH 03/10] improve code in thread --- python/pyspark/ml/tuning.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index af4b8c0abd15..f9d0c1ba6244 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -271,9 +271,11 @@ def singleTrain(index): model = est.fit(train, epm[index]) # TODO: duplicate evaluator to take extra params from input metric = eva.evaluate(model.transform(validation, epm[index])) - metrics[index] += metric/nFolds + return metric - pool.map(singleTrain, range(numModels)) + currentFoldMetrics = pool.map(singleTrain, range(numModels)) + for k in range(numModels): + metrics[k] += (currentFoldMetrics[k] / nFolds) validation.unpersist() if eva.isLargerBetter(): @@ -515,7 +517,6 @@ def _fit(self, dataset): seed = self.getOrDefault(self.seed) randCol = self.uid + "_rand" df = dataset.select("*", rand(seed).alias(randCol)) - metrics = [0.0] * numModels condition = (df[randCol] >= tRatio) validation = df.filter(condition).cache() train = df.filter(~condition).cache() @@ -523,10 +524,10 @@ def _fit(self, dataset): def singleTrain(index): model = est.fit(train, epm[index]) metric = eva.evaluate(model.transform(validation, epm[index])) - metrics[index] += metric + return metric pool = ThreadPool(processes=min(self.getParallelism(), numModels)) - pool.map(singleTrain, range(numModels)) + metrics = pool.map(singleTrain, range(numModels)) train.unpersist() validation.unpersist() From 6c3debd0391c32907855b4cd3d7ac6ff734a8f27 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 6 Sep 2017 21:17:35 +0800 Subject: [PATCH 04/10] update --- python/pyspark/ml/tuning.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index f9d0c1ba6244..22ad5a37a4c9 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -265,7 +265,7 @@ def _fit(self, dataset): validateUB = (i + 1) * h condition = (df[randCol] >= validateLB) & (df[randCol] < validateUB) validation = df.filter(condition).cache() - train = df.filter(~condition) + train = df.filter(~condition).cache() def singleTrain(index): model = est.fit(train, epm[index]) @@ -277,6 +277,7 @@ def singleTrain(index): for k in range(numModels): metrics[k] += (currentFoldMetrics[k] / nFolds) validation.unpersist() + train.unpersist() if eva.isLargerBetter(): bestIndex = np.argmax(metrics) From 849b6753c1628feba6c63c3c37cd6e1c78aa0540 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 13 Sep 2017 10:44:00 +0800 Subject: [PATCH 05/10] update --- python/pyspark/ml/tuning.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 22ad5a37a4c9..6bbad27e1e02 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -258,7 +258,10 @@ def _fit(self, dataset): df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels - pool = ThreadPool(processes=min(self.getParallelism(), numModels)) + parallelism = self.getParallelism() + if parallelism < 1: + raise ValueError("parallelism should >= 1.") + pool = ThreadPool(processes=min(parallelism, numModels)) for i in range(nFolds): validateLB = i * h @@ -267,13 +270,13 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - def singleTrain(index): - model = est.fit(train, epm[index]) + def singleTrain(paramMap): + model = est.fit(train, paramMap) # TODO: duplicate evaluator to take extra params from input - metric = eva.evaluate(model.transform(validation, epm[index])) + metric = eva.evaluate(model.transform(validation, paramMap)) return metric - currentFoldMetrics = pool.map(singleTrain, range(numModels)) + currentFoldMetrics = pool.map(singleTrain, epm) for k in range(numModels): metrics[k] += (currentFoldMetrics[k] / nFolds) validation.unpersist() @@ -522,13 +525,16 @@ def _fit(self, dataset): validation = df.filter(condition).cache() train = df.filter(~condition).cache() - def singleTrain(index): - model = est.fit(train, epm[index]) - metric = eva.evaluate(model.transform(validation, epm[index])) + def singleTrain(paramMap): + model = est.fit(train, paramMap) + metric = eva.evaluate(model.transform(validation, paramMap)) return metric - pool = ThreadPool(processes=min(self.getParallelism(), numModels)) - metrics = pool.map(singleTrain, range(numModels)) + parallelism = self.getParallelism() + if parallelism < 1: + raise ValueError("parallelism should >= 1.") + pool = ThreadPool(processes=min(parallelism, numModels)) + metrics = pool.map(singleTrain, epm) train.unpersist() validation.unpersist() From b03499a64f543a32eff5a21bddb9afe37f951eaf Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 13 Sep 2017 11:41:21 +0800 Subject: [PATCH 06/10] add serial parallel cmp testcase --- python/pyspark/ml/tests.py | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 8b8bcc7b13a3..6d51e87bfe88 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -836,6 +836,27 @@ def test_save_load_simple_estimator(self): loadedModel = CrossValidatorModel.load(cvModelPath) self.assertEqual(loadedModel.bestModel.uid, cvModel.bestModel.uid) + def test_parallel_evaluation(self): + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + evaluator = BinaryClassificationEvaluator() + + # test save/load of CrossValidator + cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + cv.setParallelism(1) + cvSerialModel = cv.fit(dataset) + cv.setParallelism(2) + cvParallelModel = cv.fit(dataset) + self.assertEqual(sorted(cvSerialModel.avgMetrics), sorted(cvParallelModel.avgMetrics)) + def test_save_load_nested_estimator(self): temp_path = tempfile.mkdtemp() dataset = self.spark.createDataFrame( @@ -986,6 +1007,24 @@ def test_save_load_simple_estimator(self): loadedModel = TrainValidationSplitModel.load(tvsModelPath) self.assertEqual(loadedModel.bestModel.uid, tvsModel.bestModel.uid) + def test_parallel_evaluation(self): + dataset = self.spark.createDataFrame( + [(Vectors.dense([0.0]), 0.0), + (Vectors.dense([0.4]), 1.0), + (Vectors.dense([0.5]), 0.0), + (Vectors.dense([0.6]), 1.0), + (Vectors.dense([1.0]), 1.0)] * 10, + ["features", "label"]) + lr = LogisticRegression() + grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + evaluator = BinaryClassificationEvaluator() + tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) + tvs.setParallelism(1) + tvsSerialModel = tvs.fit(dataset) + tvs.setParallelism(2) + tvsParallelModel = tvs.fit(dataset) + self.assertEqual(sorted(tvsSerialModel.validationMetrics), sorted(tvsParallelModel.validationMetrics)) + def test_save_load_nested_estimator(self): # This tests saving and loading the trained model only. # Save/load for TrainValidationSplit will be added later: SPARK-13786 From fb0ac045f5ff2697b7dc9ff05695f74251e8cf40 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Wed, 13 Sep 2017 12:40:40 +0800 Subject: [PATCH 07/10] fix py style --- python/pyspark/ml/tests.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 6d51e87bfe88..8a31a0f49d29 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1023,7 +1023,8 @@ def test_parallel_evaluation(self): tvsSerialModel = tvs.fit(dataset) tvs.setParallelism(2) tvsParallelModel = tvs.fit(dataset) - self.assertEqual(sorted(tvsSerialModel.validationMetrics), sorted(tvsParallelModel.validationMetrics)) + self.assertEqual(sorted(tvsSerialModel.validationMetrics), + sorted(tvsParallelModel.validationMetrics)) def test_save_load_nested_estimator(self): # This tests saving and loading the trained model only. From dbe66fb2cb37b581c2fb9b1dab863fbdf2383e10 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Fri, 22 Sep 2017 10:57:44 +0800 Subject: [PATCH 08/10] update --- .../spark/ml/tuning/CrossValidatorSuite.scala | 4 ++-- .../ml/tuning/TrainValidationSplitSuite.scala | 4 ++-- python/pyspark/ml/tests.py | 5 ++--- python/pyspark/ml/tuning.py | 17 +++++------------ 4 files changed, 11 insertions(+), 19 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala index a01744f7b67f..853eeb39bf8d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/CrossValidatorSuite.scala @@ -137,8 +137,8 @@ class CrossValidatorSuite cv.setParallelism(2) val cvParallelModel = cv.fit(dataset) - val serialMetrics = cvSerialModel.avgMetrics.sorted - val parallelMetrics = cvParallelModel.avgMetrics.sorted + val serialMetrics = cvSerialModel.avgMetrics + val parallelMetrics = cvParallelModel.avgMetrics assert(serialMetrics === parallelMetrics) val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression] diff --git a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala index 2ed4fbb601b6..f8d9c66be2c4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tuning/TrainValidationSplitSuite.scala @@ -138,8 +138,8 @@ class TrainValidationSplitSuite cv.setParallelism(2) val cvParallelModel = cv.fit(dataset) - val serialMetrics = cvSerialModel.validationMetrics.sorted - val parallelMetrics = cvParallelModel.validationMetrics.sorted + val serialMetrics = cvSerialModel.validationMetrics + val parallelMetrics = cvParallelModel.validationMetrics assert(serialMetrics === parallelMetrics) val parentSerial = cvSerialModel.bestModel.parent.asInstanceOf[LogisticRegression] diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 8a31a0f49d29..6df3701b6cd7 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -855,7 +855,7 @@ def test_parallel_evaluation(self): cvSerialModel = cv.fit(dataset) cv.setParallelism(2) cvParallelModel = cv.fit(dataset) - self.assertEqual(sorted(cvSerialModel.avgMetrics), sorted(cvParallelModel.avgMetrics)) + self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics) def test_save_load_nested_estimator(self): temp_path = tempfile.mkdtemp() @@ -1023,8 +1023,7 @@ def test_parallel_evaluation(self): tvsSerialModel = tvs.fit(dataset) tvs.setParallelism(2) tvsParallelModel = tvs.fit(dataset) - self.assertEqual(sorted(tvsSerialModel.validationMetrics), - sorted(tvsParallelModel.validationMetrics)) + self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics) def test_save_load_nested_estimator(self): # This tests saving and loading the trained model only. diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 6bbad27e1e02..646d3c987b23 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -16,14 +16,13 @@ # import itertools import numpy as np - from multiprocessing.pool import ThreadPool from pyspark import since, keyword_only from pyspark.ml import Estimator, Model from pyspark.ml.common import _py2java from pyspark.ml.param import Params, Param, TypeConverters -from pyspark.ml.param.shared import HasSeed, HasParallelism +from pyspark.ml.param.shared import HasParallelism, HasSeed from pyspark.ml.util import * from pyspark.ml.wrapper import JavaParams from pyspark.sql.functions import rand @@ -258,10 +257,7 @@ def _fit(self, dataset): df = dataset.select("*", rand(seed).alias(randCol)) metrics = [0.0] * numModels - parallelism = self.getParallelism() - if parallelism < 1: - raise ValueError("parallelism should >= 1.") - pool = ThreadPool(processes=min(parallelism, numModels)) + pool = ThreadPool(processes=min(self.getParallelism(), numModels)) for i in range(nFolds): validateLB = i * h @@ -277,8 +273,8 @@ def singleTrain(paramMap): return metric currentFoldMetrics = pool.map(singleTrain, epm) - for k in range(numModels): - metrics[k] += (currentFoldMetrics[k] / nFolds) + for j in range(numModels): + metrics[j] += (currentFoldMetrics[j] / nFolds) validation.unpersist() train.unpersist() @@ -530,10 +526,7 @@ def singleTrain(paramMap): metric = eva.evaluate(model.transform(validation, paramMap)) return metric - parallelism = self.getParallelism() - if parallelism < 1: - raise ValueError("parallelism should >= 1.") - pool = ThreadPool(processes=min(parallelism, numModels)) + pool = ThreadPool(processes=min(self.getParallelism(), numModels)) metrics = pool.map(singleTrain, epm) train.unpersist() validation.unpersist() From 93ab39ad0ac3517d383d7f7632d865f34ff86606 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 28 Sep 2017 17:52:10 +0800 Subject: [PATCH 09/10] update --- python/pyspark/ml/tuning.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py index 646d3c987b23..47351133524e 100644 --- a/python/pyspark/ml/tuning.py +++ b/python/pyspark/ml/tuning.py @@ -326,9 +326,10 @@ def _from_java(cls, java_stage): estimator, epms, evaluator = super(CrossValidator, cls)._from_java_impl(java_stage) numFolds = java_stage.getNumFolds() seed = java_stage.getSeed() + parallelism = java_stage.getParallelism() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - numFolds=numFolds, seed=seed) + numFolds=numFolds, seed=seed, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -347,6 +348,7 @@ def _to_java(self): _java_obj.setEstimator(estimator) _java_obj.setSeed(self.getSeed()) _java_obj.setNumFolds(self.getNumFolds()) + _java_obj.setParallelism(self.getParallelism()) return _java_obj @@ -579,9 +581,10 @@ def _from_java(cls, java_stage): estimator, epms, evaluator = super(TrainValidationSplit, cls)._from_java_impl(java_stage) trainRatio = java_stage.getTrainRatio() seed = java_stage.getSeed() + parallelism = java_stage.getParallelism() # Create a new instance of this stage. py_stage = cls(estimator=estimator, estimatorParamMaps=epms, evaluator=evaluator, - trainRatio=trainRatio, seed=seed) + trainRatio=trainRatio, seed=seed, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -600,6 +603,7 @@ def _to_java(self): _java_obj.setEstimator(estimator) _java_obj.setTrainRatio(self.getTrainRatio()) _java_obj.setSeed(self.getSeed()) + _java_obj.setParallelism(self.getParallelism()) return _java_obj From 67ad3d259faad728d3b14ccccd124eafb930ac3a Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Thu, 26 Oct 2017 11:26:16 +0800 Subject: [PATCH 10/10] improve unit test --- python/pyspark/ml/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 6df3701b6cd7..2f1f3af957e4 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -846,7 +846,7 @@ def test_parallel_evaluation(self): ["features", "label"]) lr = LogisticRegression() - grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build() evaluator = BinaryClassificationEvaluator() # test save/load of CrossValidator @@ -1016,7 +1016,7 @@ def test_parallel_evaluation(self): (Vectors.dense([1.0]), 1.0)] * 10, ["features", "label"]) lr = LogisticRegression() - grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build() + grid = ParamGridBuilder().addGrid(lr.maxIter, [5, 6]).build() evaluator = BinaryClassificationEvaluator() tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator) tvs.setParallelism(1)