From c89146475534d8e284afb24dfeb6d437bde158ec Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 17 Dec 2018 15:52:40 -0600 Subject: [PATCH 1/5] [SPARK-26019][PYSPARK] Allow insecure py4j gateways Spark always creates secure py4j connections between java and python, but it also allows users to pass in their own connection. This restores the ability for users to pass in an _insecure_ connection, though it forces them to set 'spark.python.allowInsecurePy4j=true' and still issues a warning. Added test cases verifying the failure without the extra configuration, and verifying things still work with an insecure configuration (in particular, accumulators, as those were broken with an insecure py4j gateway before). --- .../api/python/PythonGatewayServer.scala | 11 +++++-- .../apache/spark/api/python/PythonRDD.scala | 6 ++-- python/pyspark/accumulators.py | 7 +++-- python/pyspark/context.py | 11 +++++++ python/pyspark/java_gateway.py | 19 ++++++++++-- python/pyspark/tests.py | 29 +++++++++++++++++++ 6 files changed, 72 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala index 9ddc4a4910180..a8256e6491f4c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala @@ -43,12 +43,17 @@ private[spark] object PythonGatewayServer extends Logging { // with the same secret, in case the app needs callbacks from the JVM to the underlying // python processes. val localhost = InetAddress.getLoopbackAddress() - val gatewayServer: GatewayServer = new GatewayServer.GatewayServerBuilder() - .authToken(secret) + val builder = new GatewayServer.GatewayServerBuilder() .javaPort(0) .javaAddress(localhost) .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret) - .build() + if (sys.env.getOrElse("_PYSPARK_INSECURE_GATEWAY", "0") != "1") { + builder.authToken(secret) + } else { + assert(sys.env.getOrElse("SPARK_TESTING", "0") == "1", + "Creating insecure java gateways only allowed for testing") + } + val gatewayServer: GatewayServer = builder.build() gatewayServer.start() val boundPort: Int = gatewayServer.getListeningPort diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5ed5070558af7..81494b167af50 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -616,8 +616,10 @@ private[spark] class PythonAccumulatorV2( if (socket == null || socket.isClosed) { socket = new Socket(serverHost, serverPort) logInfo(s"Connected to AccumulatorServer at host: $serverHost port: $serverPort") - // send the secret just for the initial authentication when opening a new connection - socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) + if (secretToken != null) { + // send the secret just for the initial authentication when opening a new connection + socket.getOutputStream.write(secretToken.getBytes(StandardCharsets.UTF_8)) + } } socket } diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 00ec094e7e3b4..105ef7b325ed1 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -262,9 +262,10 @@ def authenticate_and_accum_updates(): raise Exception( "The value of the provided token to the AccumulatorServer is not correct.") - # first we keep polling till we've received the authentication token - poll(authenticate_and_accum_updates) - # now we've authenticated, don't need to check for the token anymore + if auth_token: + # first we keep polling till we've received the authentication token + poll(authenticate_and_accum_updates) + # now we've authenticated if needed, don't need to check for the token anymore poll(accum_updates) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 0924d3d95f044..94a6e4ce8e361 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -112,6 +112,17 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ValueError:... """ self._callsite = first_spark_call() or CallSite(None, None, None) + if gateway != None and gateway.gateway_parameters.auth_token == None: + if conf and conf.get("spark.python.allowInsecurePy4j", "false") == "true": + print("****BAM****") + warnings.warn("You are passing in an insecure py4j gateway. This " + "presents a security risk, and will be completely forbidden in Spark 3.0") + else: + raise Exception("You are trying to pass an insecure py4j gateway to spark. This" + " presents a security risk. If you are sure you understand and accept this" + " risk, you can add the conf 'spark.python.allowInsecurePy4j=true', but" + " note this option will be removed in Spark 3.0") + SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index c8c5f801f89bb..ce261b9fcad04 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -43,6 +43,16 @@ def launch_gateway(conf=None): :param conf: spark configuration passed to spark-submit :return: """ + return _launch_gateway(conf) + +def _launch_gateway(conf=None, insecure=False): + """ + launch jvm gateway + :param conf: spark configuration passed to spark-submit + :return: + """ + if insecure and not os.environ.get("SPARK_TESTING", "0") == "1": + raise Exception("creating insecure gateways is only for testing") if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"] @@ -74,6 +84,8 @@ def launch_gateway(conf=None): env = dict(os.environ) env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file + if insecure: + env["_PYSPARK_INSECURE_GATEWAY"] = "1" # Launch the Java gateway. # We open a pipe to stdin so that the Java gateway can die when the pipe is broken @@ -116,9 +128,10 @@ def killChild(): atexit.register(killChild) # Connect to the gateway - gateway = JavaGateway( - gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret, - auto_convert=True)) + gateway_params = GatewayParameters(port=gateway_port, auto_convert=True) + if not insecure: + gateway_params.auth_token=gateway_secret + gateway = JavaGateway(gateway_parameters=gateway_params) # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 131c51e108cad..904c2f241d435 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -61,6 +61,7 @@ from pyspark import keyword_only from pyspark.conf import SparkConf from pyspark.context import SparkContext +from pyspark.java_gateway import _launch_gateway from pyspark.rdd import RDD from pyspark.files import SparkFiles from pyspark.serializers import read_int, BatchedSerializer, MarshalSerializer, PickleSerializer, \ @@ -2381,6 +2382,34 @@ def test_startTime(self): with SparkContext() as sc: self.assertGreater(sc.startTime, 0) + def test_forbid_insecure_gateway(self): + # By default, we fail immediately if you try to create a SparkContext + # with an insecure gateway + gateway = _launch_gateway(insecure=True) + with self.assertRaises(Exception) as context: + SparkContext(gateway=gateway) + self.assertIn("insecure py4j gateway", context.exception.message) + self.assertIn("spark.python.allowInsecurePy4j", context.exception.message) + self.assertIn("removed in Spark 3.0", context.exception.message) + + def test_allow_insecure_gateway_with_conf(self): + with SparkContext._lock: + SparkContext._gateway = None + SparkContext._jvm = None + gateway = _launch_gateway(insecure=True) + conf = SparkConf() + conf.set("spark.python.allowInsecurePy4j", "true") + print("entering allow insecure test") + with SparkContext(conf=conf, gateway=gateway) as sc: + print("sc created, about to create accum") + a = sc.accumulator(1) + rdd = sc.parallelize([1,2,3]) + def f(x): + a.add(x) + rdd.foreach(f) + self.assertEqual(7, a.value) + print("exiting allow insecure test") + class ConfTests(unittest.TestCase): def test_memory_conf(self): From 7b2d223f44ff24df51b737496a22daa15c2ecd7c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 17 Dec 2018 16:21:17 -0600 Subject: [PATCH 2/5] style --- python/pyspark/context.py | 11 ++++++----- python/pyspark/java_gateway.py | 3 ++- python/pyspark/tests.py | 4 +++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 94a6e4ce8e361..61804aea48e56 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -112,13 +112,14 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, ValueError:... """ self._callsite = first_spark_call() or CallSite(None, None, None) - if gateway != None and gateway.gateway_parameters.auth_token == None: + if gateway is not None and gateway.gateway_parameters.auth_token is None: if conf and conf.get("spark.python.allowInsecurePy4j", "false") == "true": - print("****BAM****") - warnings.warn("You are passing in an insecure py4j gateway. This " - "presents a security risk, and will be completely forbidden in Spark 3.0") + warnings.warn( + "You are passing in an insecure py4j gateway. This " + "presents a security risk, and will be completely forbidden in Spark 3.0") else: - raise Exception("You are trying to pass an insecure py4j gateway to spark. This" + raise Exception( + "You are trying to pass an insecure py4j gateway to spark. This" " presents a security risk. If you are sure you understand and accept this" " risk, you can add the conf 'spark.python.allowInsecurePy4j=true', but" " note this option will be removed in Spark 3.0") diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index ce261b9fcad04..7c795156e374e 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -45,6 +45,7 @@ def launch_gateway(conf=None): """ return _launch_gateway(conf) + def _launch_gateway(conf=None, insecure=False): """ launch jvm gateway @@ -130,7 +131,7 @@ def killChild(): # Connect to the gateway gateway_params = GatewayParameters(port=gateway_port, auto_convert=True) if not insecure: - gateway_params.auth_token=gateway_secret + gateway_params.auth_token = gateway_secret gateway = JavaGateway(gateway_parameters=gateway_params) # Import the classes used by PySpark diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 904c2f241d435..431a865e10609 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2403,9 +2403,11 @@ def test_allow_insecure_gateway_with_conf(self): with SparkContext(conf=conf, gateway=gateway) as sc: print("sc created, about to create accum") a = sc.accumulator(1) - rdd = sc.parallelize([1,2,3]) + rdd = sc.parallelize([1, 2, 3]) + def f(x): a.add(x) + rdd.foreach(f) self.assertEqual(7, a.value) print("exiting allow insecure test") From e83b160bc1222f5f20dce6490e25afc04d509244 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 17 Dec 2018 22:33:38 -0600 Subject: [PATCH 3/5] review feedback --- .../spark/api/python/PythonGatewayServer.scala | 2 +- python/pyspark/context.py | 4 ++-- python/pyspark/java_gateway.py | 4 ++-- python/pyspark/tests.py | 14 ++++---------- 4 files changed, 9 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala index a8256e6491f4c..656f72e8f11e9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala @@ -51,7 +51,7 @@ private[spark] object PythonGatewayServer extends Logging { builder.authToken(secret) } else { assert(sys.env.getOrElse("SPARK_TESTING", "0") == "1", - "Creating insecure java gateways only allowed for testing") + "Creating insecure Java gateways only allowed for testing") } val gatewayServer: GatewayServer = builder.build() diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 61804aea48e56..7e3e16979860e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -115,11 +115,11 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, if gateway is not None and gateway.gateway_parameters.auth_token is None: if conf and conf.get("spark.python.allowInsecurePy4j", "false") == "true": warnings.warn( - "You are passing in an insecure py4j gateway. This " + "You are passing in an insecure Py4j gateway. This " "presents a security risk, and will be completely forbidden in Spark 3.0") else: raise Exception( - "You are trying to pass an insecure py4j gateway to spark. This" + "You are trying to pass an insecure Py4j gateway to Spark. This" " presents a security risk. If you are sure you understand and accept this" " risk, you can add the conf 'spark.python.allowInsecurePy4j=true', but" " note this option will be removed in Spark 3.0") diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 7c795156e374e..aba88576ef47e 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -41,7 +41,7 @@ def launch_gateway(conf=None): """ launch jvm gateway :param conf: spark configuration passed to spark-submit - :return: + :return: a JVM gateway """ return _launch_gateway(conf) @@ -50,7 +50,7 @@ def _launch_gateway(conf=None, insecure=False): """ launch jvm gateway :param conf: spark configuration passed to spark-submit - :return: + :return: a JVM gateway """ if insecure and not os.environ.get("SPARK_TESTING", "0") == "1": raise Exception("creating insecure gateways is only for testing") diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 431a865e10609..c9a2dafeead05 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2388,9 +2388,9 @@ def test_forbid_insecure_gateway(self): gateway = _launch_gateway(insecure=True) with self.assertRaises(Exception) as context: SparkContext(gateway=gateway) - self.assertIn("insecure py4j gateway", context.exception.message) - self.assertIn("spark.python.allowInsecurePy4j", context.exception.message) - self.assertIn("removed in Spark 3.0", context.exception.message) + self.assertIn("insecure Py4j gateway", str(context.exception)) + self.assertIn("spark.python.allowInsecurePy4j", str(context.exception)) + self.assertIn("removed in Spark 3.0", str(context.exception)) def test_allow_insecure_gateway_with_conf(self): with SparkContext._lock: @@ -2399,18 +2399,12 @@ def test_allow_insecure_gateway_with_conf(self): gateway = _launch_gateway(insecure=True) conf = SparkConf() conf.set("spark.python.allowInsecurePy4j", "true") - print("entering allow insecure test") with SparkContext(conf=conf, gateway=gateway) as sc: print("sc created, about to create accum") a = sc.accumulator(1) rdd = sc.parallelize([1, 2, 3]) - - def f(x): - a.add(x) - - rdd.foreach(f) + rdd.foreach(lambda x: a.add(x)) self.assertEqual(7, a.value) - print("exiting allow insecure test") class ConfTests(unittest.TestCase): From 9cc545b278406fa57502af89bd4a4eb6a0ea2f04 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 19 Dec 2018 16:44:18 -0600 Subject: [PATCH 4/5] updated to use environment variable to allow insecure gateways --- .../api/python/PythonGatewayServer.scala | 2 +- python/pyspark/context.py | 6 ++++-- python/pyspark/java_gateway.py | 3 ++- python/pyspark/tests.py | 20 ++++++++++--------- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala index 656f72e8f11e9..17c65f6170d67 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala @@ -47,7 +47,7 @@ private[spark] object PythonGatewayServer extends Logging { .javaPort(0) .javaAddress(localhost) .callbackClient(GatewayServer.DEFAULT_PYTHON_PORT, localhost, secret) - if (sys.env.getOrElse("_PYSPARK_INSECURE_GATEWAY", "0") != "1") { + if (sys.env.getOrElse("_PYSPARK_CREATE_INSECURE_GATEWAY", "0") != "1") { builder.authToken(secret) } else { assert(sys.env.getOrElse("SPARK_TESTING", "0") == "1", diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7e3e16979860e..ad4333b5d18dd 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -113,7 +113,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, """ self._callsite = first_spark_call() or CallSite(None, None, None) if gateway is not None and gateway.gateway_parameters.auth_token is None: - if conf and conf.get("spark.python.allowInsecurePy4j", "false") == "true": + allow_insecure_env = os.environ.get("PYSPARK_ALLOW_INSECURE_GATEWAY", "0") + if allow_insecure_env == "1" or allow_insecure_env.lower() == "true": warnings.warn( "You are passing in an insecure Py4j gateway. This " "presents a security risk, and will be completely forbidden in Spark 3.0") @@ -121,7 +122,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, raise Exception( "You are trying to pass an insecure Py4j gateway to Spark. This" " presents a security risk. If you are sure you understand and accept this" - " risk, you can add the conf 'spark.python.allowInsecurePy4j=true', but" + " risk, you can set the environment variable" + " 'PYSPARK_ALLOW_INSECURE_GATEWAY=1', but" " note this option will be removed in Spark 3.0") SparkContext._ensure_initialized(self, gateway=gateway, conf=conf) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index aba88576ef47e..3ef498b62225d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -50,6 +50,7 @@ def _launch_gateway(conf=None, insecure=False): """ launch jvm gateway :param conf: spark configuration passed to spark-submit + :param insecure: True to create an insecure gateway; only for testing :return: a JVM gateway """ if insecure and not os.environ.get("SPARK_TESTING", "0") == "1": @@ -86,7 +87,7 @@ def _launch_gateway(conf=None, insecure=False): env = dict(os.environ) env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file if insecure: - env["_PYSPARK_INSECURE_GATEWAY"] = "1" + env["_PYSPARK_CREATE_INSECURE_GATEWAY"] = "1" # Launch the Java gateway. # We open a pipe to stdin so that the Java gateway can die when the pipe is broken diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c9a2dafeead05..6267b682f3e2b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2389,7 +2389,7 @@ def test_forbid_insecure_gateway(self): with self.assertRaises(Exception) as context: SparkContext(gateway=gateway) self.assertIn("insecure Py4j gateway", str(context.exception)) - self.assertIn("spark.python.allowInsecurePy4j", str(context.exception)) + self.assertIn("PYSPARK_ALLOW_INSECURE_GATEWAY", str(context.exception)) self.assertIn("removed in Spark 3.0", str(context.exception)) def test_allow_insecure_gateway_with_conf(self): @@ -2397,14 +2397,16 @@ def test_allow_insecure_gateway_with_conf(self): SparkContext._gateway = None SparkContext._jvm = None gateway = _launch_gateway(insecure=True) - conf = SparkConf() - conf.set("spark.python.allowInsecurePy4j", "true") - with SparkContext(conf=conf, gateway=gateway) as sc: - print("sc created, about to create accum") - a = sc.accumulator(1) - rdd = sc.parallelize([1, 2, 3]) - rdd.foreach(lambda x: a.add(x)) - self.assertEqual(7, a.value) + try: + os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1" + with SparkContext(gateway=gateway) as sc: + print("sc created, about to create accum") + a = sc.accumulator(1) + rdd = sc.parallelize([1, 2, 3]) + rdd.foreach(lambda x: a.add(x)) + self.assertEqual(7, a.value) + finally: + os.environ.pop("PYSPARK_ALLOW_INSECURE_GATEWAY", None) class ConfTests(unittest.TestCase): From c6e0811b7eeaf85a7ec9e60d9b7447cd69211c9d Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Wed, 2 Jan 2019 12:48:46 -0600 Subject: [PATCH 5/5] style issues --- python/pyspark/accumulators.py | 2 +- python/pyspark/context.py | 2 +- python/pyspark/java_gateway.py | 4 ++-- python/pyspark/tests.py | 17 +++++++++++------ 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 105ef7b325ed1..855d8fb4a859f 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -262,7 +262,7 @@ def authenticate_and_accum_updates(): raise Exception( "The value of the provided token to the AccumulatorServer is not correct.") - if auth_token: + if auth_token is not None: # first we keep polling till we've received the authentication token poll(authenticate_and_accum_updates) # now we've authenticated if needed, don't need to check for the token anymore diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ad4333b5d18dd..6d99e9823f001 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -119,7 +119,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, "You are passing in an insecure Py4j gateway. This " "presents a security risk, and will be completely forbidden in Spark 3.0") else: - raise Exception( + raise ValueError( "You are trying to pass an insecure Py4j gateway to Spark. This" " presents a security risk. If you are sure you understand and accept this" " risk, you can set the environment variable" diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3ef498b62225d..feb6b7bd6aa3d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -53,8 +53,8 @@ def _launch_gateway(conf=None, insecure=False): :param insecure: True to create an insecure gateway; only for testing :return: a JVM gateway """ - if insecure and not os.environ.get("SPARK_TESTING", "0") == "1": - raise Exception("creating insecure gateways is only for testing") + if insecure and os.environ.get("SPARK_TESTING", "0") != "1": + raise ValueError("creating insecure gateways is only for testing") if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"] diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 6267b682f3e2b..a2d825ba36256 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -2386,11 +2386,17 @@ def test_forbid_insecure_gateway(self): # By default, we fail immediately if you try to create a SparkContext # with an insecure gateway gateway = _launch_gateway(insecure=True) - with self.assertRaises(Exception) as context: - SparkContext(gateway=gateway) - self.assertIn("insecure Py4j gateway", str(context.exception)) - self.assertIn("PYSPARK_ALLOW_INSECURE_GATEWAY", str(context.exception)) - self.assertIn("removed in Spark 3.0", str(context.exception)) + log4j = gateway.jvm.org.apache.log4j + old_level = log4j.LogManager.getRootLogger().getLevel() + try: + log4j.LogManager.getRootLogger().setLevel(log4j.Level.FATAL) + with self.assertRaises(Exception) as context: + SparkContext(gateway=gateway) + self.assertIn("insecure Py4j gateway", str(context.exception)) + self.assertIn("PYSPARK_ALLOW_INSECURE_GATEWAY", str(context.exception)) + self.assertIn("removed in Spark 3.0", str(context.exception)) + finally: + log4j.LogManager.getRootLogger().setLevel(old_level) def test_allow_insecure_gateway_with_conf(self): with SparkContext._lock: @@ -2400,7 +2406,6 @@ def test_allow_insecure_gateway_with_conf(self): try: os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1" with SparkContext(gateway=gateway) as sc: - print("sc created, about to create accum") a = sc.accumulator(1) rdd = sc.parallelize([1, 2, 3]) rdd.foreach(lambda x: a.add(x))