From 35eeafe1a25b84d20ec1c11c59a9809ff9bcd0e7 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 23 Jan 2017 13:10:31 -0800 Subject: [PATCH 1/3] [SPARK-19307][pyspark] Make sure user conf is propagated to SparkContext. The code was failing to propagate the user conf in the case where the JVM was already initialized, which happens when a user submits a python script via spark-submit. Tested with new unit test and by running a python script in a real cluster. --- python/pyspark/context.py | 3 +++ python/pyspark/tests.py | 20 ++++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5c4e79cb0499..ac4b2b035f5c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -132,6 +132,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._conf = conf else: self._conf = SparkConf(_jvm=SparkContext._jvm) + if conf is not None: + for k, v in conf.getAll(): + self._conf.set(k, v) self._batchSize = batchSize # -1 represents an unlimited batch size self._unbatched_serializer = serializer diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c383d9ab6767..96081dda5c67 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2035,6 +2035,26 @@ def test_single_script_on_cluster(self): self.assertEqual(0, proc.returncode) self.assertIn("[2, 4, 6]", out.decode('utf-8')) + def test_user_configuration(self): + """Make sure user configuration is respected (SPARK-19307)""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkConf, SparkContext + | + |conf = SparkConf().set("spark.test_config", "1") + |sc = SparkContext(conf = conf) + |try: + | if sc._conf.get("spark.test_config") != "1": + | raise Exception("Cannot find spark.test_config in SparkContext's conf.") + |finally: + | sc.stop() + """) + try: + subprocess.check_output([self.sparkSubmit, "--master", "local", script], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + # This gives us a better error message for debugging. + self.fail("Test exited with {0}, output:\n{1}".format(e.returncode, e.output)) + class ContextTests(unittest.TestCase): From cef51362bea2c76ce5c699d732df6dfca70dad94 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 23 Jan 2017 14:28:59 -0800 Subject: [PATCH 2/3] Fix style. --- python/pyspark/tests.py | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 96081dda5c67..6527d54433ca 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2036,24 +2036,25 @@ def test_single_script_on_cluster(self): self.assertIn("[2, 4, 6]", out.decode('utf-8')) def test_user_configuration(self): - """Make sure user configuration is respected (SPARK-19307)""" - script = self.createTempFile("test.py", """ - |from pyspark import SparkConf, SparkContext - | - |conf = SparkConf().set("spark.test_config", "1") - |sc = SparkContext(conf = conf) - |try: - | if sc._conf.get("spark.test_config") != "1": - | raise Exception("Cannot find spark.test_config in SparkContext's conf.") - |finally: - | sc.stop() - """) - try: - subprocess.check_output([self.sparkSubmit, "--master", "local", script], - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as e: - # This gives us a better error message for debugging. - self.fail("Test exited with {0}, output:\n{1}".format(e.returncode, e.output)) + """Make sure user configuration is respected (SPARK-19307)""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkConf, SparkContext + | + |conf = SparkConf().set("spark.test_config", "1") + |sc = SparkContext(conf = conf) + |try: + | if sc._conf.get("spark.test_config") != "1": + | raise Exception("Cannot find spark.test_config in SparkContext's conf.") + |finally: + | sc.stop() + """) + try: + subprocess.check_output( + [self.sparkSubmit, "--master", "local", script], + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + # This gives us a better error message for debugging. + self.fail("Test exited with {0}, output:\n{1}".format(e.returncode, e.output)) class ContextTests(unittest.TestCase): From ef8349a6effd089980f85d829257d57dca7d79e6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 23 Jan 2017 17:11:28 -0800 Subject: [PATCH 3/3] Avoid check_output to make older python happy. --- python/pyspark/tests.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 6527d54433ca..e908b1e739bb 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2048,13 +2048,12 @@ def test_user_configuration(self): |finally: | sc.stop() """) - try: - subprocess.check_output( - [self.sparkSubmit, "--master", "local", script], - stderr=subprocess.STDOUT) - except subprocess.CalledProcessError as e: - # This gives us a better error message for debugging. - self.fail("Test exited with {0}, output:\n{1}".format(e.returncode, e.output)) + proc = subprocess.Popen( + [self.sparkSubmit, "--master", "local", script], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode, msg="Process failed with error:\n {0}".format(out)) class ContextTests(unittest.TestCase):