Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,21 @@
from pyspark.util import _exception_message


def launch_gateway(conf=None):
def launch_gateway(conf=None, popen_kwargs=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this _popen_kwargs to indicate it's usage is possibly not super supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a comment in the docstring to that effect be better? I haven't seen _var_name used in Python projects to indicate a developer feature. (But of course, maybe I've just not seen it yet!)

"""
launch jvm gateway
:param conf: spark configuration passed to spark-submit
:param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention that this is a developer feature and may change in future versions.

Copy link
Contributor Author

@parente parente Sep 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And ... you already noted what I just commented above. Doh! I'll update the docstring at least.

the py4j JVM. This is a developer feature intended for use in
customizing how pyspark interacts with the py4j JVM (e.g., capturing
stdout/stderr).
:return:
"""
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
# Process already exists
proc = None
else:
SPARK_HOME = _find_spark_home()
# Launch the Py4j gateway using Spark's run command so that we pick up the
Expand Down Expand Up @@ -76,15 +82,20 @@ def launch_gateway(conf=None):
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file

# Launch the Java gateway.
popen_kwargs = {} if popen_kwargs is None else popen_kwargs
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
popen_kwargs['stdin'] = PIPE
# We always set the necessary environment variables.
popen_kwargs['env'] = env
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
popen_kwargs['preexec_fn'] = preexec_func
proc = Popen(command, **popen_kwargs)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdin=PIPE, env=env)
proc = Popen(command, **popen_kwargs)

# Wait for the file to appear, or for the process to exit, whichever happens first.
while not proc.poll() and not os.path.isfile(conn_info_file):
Expand Down Expand Up @@ -119,6 +130,8 @@ def killChild():
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret,
auto_convert=True))
# Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
gateway.proc = proc

# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
Expand Down