From a493c1961829000986446db11ce67f3103a79bea Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 10 Nov 2016 21:46:13 +0530 Subject: [PATCH 1/7] [SPARK-18274] Memory leak in PySpark StringIndexer --- python/pyspark/ml/wrapper.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 25c44b7533c77..2f9526e7ddbc6 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -33,6 +33,10 @@ def __init__(self, java_obj=None): super(JavaWrapper, self).__init__() self._java_obj = java_obj + def __del__(self): + if SparkContext._gateway: + SparkContext._gateway.detach(self._java_obj) + @classmethod def _create_from_java_class(cls, java_class, *args): """ From f25b0992f0a18177c84203419aca8f52eaf1c6df Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 10 Nov 2016 22:07:49 +0530 Subject: [PATCH 2/7] change --- python/pyspark/ml/wrapper.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 2f9526e7ddbc6..520bb3348a775 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -33,10 +33,6 @@ def __init__(self, java_obj=None): super(JavaWrapper, self).__init__() self._java_obj = java_obj - def __del__(self): - if SparkContext._gateway: - SparkContext._gateway.detach(self._java_obj) - @classmethod def _create_from_java_class(cls, java_class, *args): """ @@ -261,6 +257,10 @@ def __init__(self, java_model=None): if java_model is not None: self._resetUid(java_model.uid()) + def __del__(self): + if SparkContext._gateway: + SparkContext._gateway.detach(self._java_obj) + def copy(self, extra=None): """ Creates a copy of this instance with the same uid and some From 3d858a2326809b7e1c679b712d84a8a21767d13c Mon Sep 17 00:00:00 2001 From: jkbradley Date: Thu, 10 Nov 2016 23:55:51 -0800 Subject: [PATCH 3/7] Fixing copy bug (#1) * moved copy from JavaModel to JavaParams. mv del from JavaModel to JavaWrapper * added test which fails before this fix --- python/pyspark/ml/tests.py | 18 +++++++++++++++ python/pyspark/ml/wrapper.py | 43 ++++++++++++++++++------------------ 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 9d46cc3b4ae64..75c4f7d8e45fc 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -390,6 +390,24 @@ def test_word2vec_param(self): self.assertEqual(model.getWindowSize(), 6) +class EvaluatorTests(SparkSessionTestCase): + + def test_java_params(self): + """ + This tests a bug fixed by SPARK-18274 which causes multiple copies + of a Params instance in Python to be linked to the same Java instance. + """ + evaluator = RegressionEvaluator(metricName="r2") + df = self.spark.createDataFrame([Row(label=1.0, prediction=1.1)]) + evaluator.evaluate(df) + self.assertEqual(evaluator._java_obj.getMetricName(), "r2") + evaluatorCopy = evaluator.copy({evaluator.metricName: "mae"}) + evaluator.evaluate(df) + evaluatorCopy.evaluate(df) + self.assertEqual(evaluator._java_obj.getMetricName(), "r2") + self.assertEqual(evaluatorCopy._java_obj.getMetricName(), "mae") + + class FeatureTests(SparkSessionTestCase): def test_binarizer(self): diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 520bb3348a775..cd3949edf9874 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -33,6 +33,9 @@ def __init__(self, java_obj=None): super(JavaWrapper, self).__init__() self._java_obj = java_obj + def __del__(self): + SparkContext._active_spark_context._gateway.detach(self._java_obj) + @classmethod def _create_from_java_class(cls, java_class, *args): """ @@ -180,6 +183,24 @@ def __get_class(clazz): % stage_name) return py_stage + def copy(self, extra=None): + """ + Creates a copy of this instance with the same uid and some + extra params. This implementation first calls Params.copy and + then make a copy of the companion Java model with extra params. + So both the Python wrapper and the Java model get copied. + + :param extra: Extra parameters to copy to the new instance + :return: Copy of this instance + """ + if extra is None: + extra = dict() + that = super(JavaParams, self).copy(extra) + if self._java_obj is not None: + that._java_obj = self._java_obj.copy(self._empty_java_param_map()) + that._transfer_params_to_java() + return that + @inherit_doc class JavaEstimator(JavaParams, Estimator): @@ -256,25 +277,3 @@ def __init__(self, java_model=None): super(JavaModel, self).__init__(java_model) if java_model is not None: self._resetUid(java_model.uid()) - - def __del__(self): - if SparkContext._gateway: - SparkContext._gateway.detach(self._java_obj) - - def copy(self, extra=None): - """ - Creates a copy of this instance with the same uid and some - extra params. This implementation first calls Params.copy and - then make a copy of the companion Java model with extra params. - So both the Python wrapper and the Java model get copied. - - :param extra: Extra parameters to copy to the new instance - :return: Copy of this instance - """ - if extra is None: - extra = dict() - that = super(JavaModel, self).copy(extra) - if self._java_obj is not None: - that._java_obj = self._java_obj.copy(self._empty_java_param_map()) - that._transfer_params_to_java() - return that From 28a940efa9cad55b0d0ee9f9fbe29879046b90a7 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Fri, 11 Nov 2016 13:34:35 +0530 Subject: [PATCH 4/7] check if there is active_spark_context causes this error while quitting pyspark: Exception ignored in: Traceback (most recent call last): File "/Users/pichu/Project/Spark/python/pyspark/ml/wrapper.py", line 37, in __del__ SparkContext._active_spark_context._gateway.detach(self._java_obj) AttributeError: 'NoneType' object has no attribute '_gateway' Exception ignored in: Traceback (most recent call last): File "/Users/pichu/Project/Spark/python/pyspark/ml/wrapper.py", line 37, in __del__ AttributeError: 'NoneType' object has no attribute '_gateway' --- python/pyspark/ml/wrapper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index cd3949edf9874..6607bc101061f 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -34,7 +34,8 @@ def __init__(self, java_obj=None): self._java_obj = java_obj def __del__(self): - SparkContext._active_spark_context._gateway.detach(self._java_obj) + if SparkContext._active_spark_context: + SparkContext._active_spark_context._gateway.detach(self._java_obj) @classmethod def _create_from_java_class(cls, java_class, *args): From 01a80b9d783dca7e74b717b0374305cb4376208a Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Fri, 11 Nov 2016 14:28:51 +0530 Subject: [PATCH 5/7] nit: doc fix --- python/pyspark/ml/wrapper.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 6607bc101061f..e6d80b966705c 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -188,8 +188,9 @@ def copy(self, extra=None): """ Creates a copy of this instance with the same uid and some extra params. This implementation first calls Params.copy and - then make a copy of the companion Java model with extra params. - So both the Python wrapper and the Java model get copied. + then make a copy of the companion Java pipeline component with + extra params. So both the Python wrapper and the Java model get + copied. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance From a76a1fb1f10532e0e99592c53fbaa548279f69a8 Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Fri, 11 Nov 2016 14:32:43 +0530 Subject: [PATCH 6/7] doc --- python/pyspark/ml/wrapper.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index e6d80b966705c..e4b6f51b18a02 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -189,8 +189,8 @@ def copy(self, extra=None): Creates a copy of this instance with the same uid and some extra params. This implementation first calls Params.copy and then make a copy of the companion Java pipeline component with - extra params. So both the Python wrapper and the Java model get - copied. + extra params. So both the Python wrapper and the Java pipeline + component get copied. :param extra: Extra parameters to copy to the new instance :return: Copy of this instance From 37e83e88fe84dcfdcd04426c85499f7494cd688b Mon Sep 17 00:00:00 2001 From: Sandeep Singh Date: Thu, 1 Dec 2016 18:56:30 +0530 Subject: [PATCH 7/7] move del to javaparams --- python/pyspark/ml/wrapper.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index e4b6f51b18a02..13b75e9919221 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -33,10 +33,6 @@ def __init__(self, java_obj=None): super(JavaWrapper, self).__init__() self._java_obj = java_obj - def __del__(self): - if SparkContext._active_spark_context: - SparkContext._active_spark_context._gateway.detach(self._java_obj) - @classmethod def _create_from_java_class(cls, java_class, *args): """ @@ -75,6 +71,10 @@ class JavaParams(JavaWrapper, Params): __metaclass__ = ABCMeta + def __del__(self): + if SparkContext._active_spark_context: + SparkContext._active_spark_context._gateway.detach(self._java_obj) + def _make_java_param_pair(self, param, value): """ Makes a Java parm pair.