From d00b6dfb3022e914b1af6680d9cfeb69466cbbc3 Mon Sep 17 00:00:00 2001 From: Peter Parente Date: Sat, 17 Jun 2017 17:17:45 -0400 Subject: [PATCH 1/4] SPARK-21094: Add popen_kwargs to launch_gateway Allow the caller to customize the py4j JVM subprocess pipes and buffers for programmatic capturing of its output. --- python/pyspark/java_gateway.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index c8c5f801f89b..220aa4c45f00 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -37,15 +37,19 @@ from pyspark.util import _exception_message -def launch_gateway(conf=None): +def launch_gateway(conf=None, popen_kwargs=None): """ launch jvm gateway :param conf: spark configuration passed to spark-submit + :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning + the py4j JVM. :return: """ if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"] + # Process already exists + proc = None else: SPARK_HOME = _find_spark_home() # Launch the Py4j gateway using Spark's run command so that we pick up the @@ -119,6 +123,8 @@ def killChild(): gateway = JavaGateway( gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret, auto_convert=True)) + # Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr) + gateway.proc = proc # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") From 464f25147473b004b50af87d3f10068c73adb248 Mon Sep 17 00:00:00 2001 From: Peter Parente Date: Fri, 8 Sep 2017 22:20:19 -0400 Subject: [PATCH 2/4] SPARK-21094: Add note about developer feature --- python/pyspark/java_gateway.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 220aa4c45f00..4e9a4f6729d6 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -42,7 +42,9 @@ def launch_gateway(conf=None, popen_kwargs=None): launch jvm gateway :param conf: spark configuration passed to spark-submit :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning - the py4j JVM. + the py4j JVM. This is a developer feature intended for useful in + customizing how pyspark interacts with the py4j JVM (e.g., capturing + stdout/stderr). :return: """ if "PYSPARK_GATEWAY_PORT" in os.environ: From fa63ba7197bc5b7844716cf2efbe62a3f652a7e7 Mon Sep 17 00:00:00 2001 From: Peter Parente Date: Sun, 28 Oct 2018 21:41:15 -0400 Subject: [PATCH 3/4] SPARK-21094: Rebase on master --- python/pyspark/java_gateway.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 4e9a4f6729d6..3fa23dd12eca 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -82,15 +82,20 @@ def launch_gateway(conf=None, popen_kwargs=None): env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file # Launch the Java gateway. + popen_kwargs = {} if popen_kwargs is None else popen_kwargs # We open a pipe to stdin so that the Java gateway can die when the pipe is broken + popen_kwargs['stdin'] = PIPE + # We always set the necessary environment variables. + popen_kwargs['env'] = env if not on_windows: # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) + popen_kwargs['preexec_fn'] = preexec_func + proc = Popen(command, **popen_kwargs) else: # preexec_fn not supported on Windows - proc = Popen(command, stdin=PIPE, env=env) + proc = Popen(command, **popen_kwargs) # Wait for the file to appear, or for the process to exit, whichever happens first. while not proc.poll() and not os.path.isfile(conn_info_file): From ea267c68c805951c5ee2fb4fccd9f8fb4a288297 Mon Sep 17 00:00:00 2001 From: Peter Parente Date: Sun, 28 Oct 2018 21:43:36 -0400 Subject: [PATCH 4/4] SPARK-21094: Fix docstring typo --- python/pyspark/java_gateway.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3fa23dd12eca..1905dbfc5734 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -42,7 +42,7 @@ def launch_gateway(conf=None, popen_kwargs=None): launch jvm gateway :param conf: spark configuration passed to spark-submit :param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning - the py4j JVM. This is a developer feature intended for useful in + the py4j JVM. This is a developer feature intended for use in customizing how pyspark interacts with the py4j JVM (e.g., capturing stdout/stderr). :return: