From 83ebe601032867327988940073de4ee08a42c3fe Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 25 Aug 2014 23:28:48 -0700 Subject: [PATCH 01/10] Parse special driver configs in Windows (broken) Note that this is still currently broken. There is an issue with using SparkSubmitDriverBootstrapper with windows; the stdin is not being picked up properly by the SparkSubmit subprocess. This must be fixed before the PR is merged. --- bin/spark-class2.cmd | 46 +++++++++++++++++++++++++++++++++++++------- bin/spark-submit.cmd | 33 +++++++++++++++++++++---------- 2 files changed, 62 insertions(+), 17 deletions(-) mode change 100755 => 100644 bin/spark-class2.cmd diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd old mode 100755 new mode 100644 index e2c5f9c38518..3d97caa8de17 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem +rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala! + setlocal enabledelayedexpansion set SCALA_VERSION=2.10 @@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given if not "x%SPARK_MEM%"=="x" ( echo Warning: SPARK_MEM is deprecated, please use a more specific config option - echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY. + echo e.g., spark.executor.memory or spark.driver.memory. ) rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options @@ -67,10 +69,18 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY. set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS% if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY% -rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS. -) else if "%1"=="org.apache.spark.repl.Main" ( - set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS% +rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS + +rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY. +rem The repl also uses SPARK_REPL_OPTS. +) else if "%1"=="org.apache.spark.deploy.SparkSubmit" ( + set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS% + if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" ( + set OUR_JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH% + ) else if not "x%SPARK_LIBRARY_PATH%"=="x" ( + set OUR_JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% + ) if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% + if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY% ) else ( set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% @@ -80,9 +90,9 @@ rem Set JAVA_OPTS to be able to load native libraries and to set heap size for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i if "%jversion%" geq "1.8.0" ( - set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% + set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% ) else ( - set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% + set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% ) rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala! @@ -115,5 +125,27 @@ rem Figure out where java is. set RUNNER=java if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java -"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %* +rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself. +rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching +rem the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM +rem to prepare the launch environment of this driver JVM. + +rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own. +rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must +rem be done here because the Windows "shift" command does not work in a conditional block. +set BOOTSTRAP_ARGS= +shift +:start_parse +if "%~1" == "" goto end_parse +set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1 +shift +goto start_parse +:end_parse + +if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] ( + set SPARK_CLASS=1 + "%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS% +) else ( + "%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %* +) :exit diff --git a/bin/spark-submit.cmd b/bin/spark-submit.cmd index 6eb702ed8c56..8f79b0837130 100644 --- a/bin/spark-submit.cmd +++ b/bin/spark-submit.cmd @@ -17,23 +17,29 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem +rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala! + set SPARK_HOME=%~dp0.. set ORIG_ARGS=%* -rem Clear the values of all variables used -set DEPLOY_MODE= -set DRIVER_MEMORY= +rem Reset the values of all variables used +set SPARK_SUBMIT_DEPLOY_MODE=client +set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf +set SPARK_SUBMIT_DRIVER_MEMORY= set SPARK_SUBMIT_LIBRARY_PATH= set SPARK_SUBMIT_CLASSPATH= set SPARK_SUBMIT_OPTS= set SPARK_DRIVER_MEMORY= +set SPARK_SUBMIT_BOOTSTRAP_DRIVER= :loop if [%1] == [] goto continue if [%1] == [--deploy-mode] ( - set DEPLOY_MODE=%2 + set SPARK_SUBMIT_DEPLOY_MODE=%2 + ) else if [%1] == [--properties-file] ( + set SPARK_SUBMIT_PROPERTIES_FILE=%2 ) else if [%1] == [--driver-memory] ( - set DRIVER_MEMORY=%2 + set SPARK_SUBMIT_DRIVER_MEMORY=%2 ) else if [%1] == [--driver-library-path] ( set SPARK_SUBMIT_LIBRARY_PATH=%2 ) else if [%1] == [--driver-class-path] ( @@ -45,12 +51,19 @@ if [%1] == [] goto continue goto loop :continue -if [%DEPLOY_MODE%] == [] ( - set DEPLOY_MODE=client -) +rem For client mode, the driver will be launched in the same JVM that launches +rem SparkSubmit, so we may need to read the properties file for any extra class +rem paths, library paths, java options and memory early on. Otherwise, it will +rem be too late by the time the driver JVM has started. -if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] ( - set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY% +if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] ( + if exist %SPARK_SUBMIT_PROPERTIES_FILE% ( + rem Parse the properties file only if the special configs exist + for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^ + %SPARK_SUBMIT_PROPERTIES_FILE%') do ( + set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + ) + ) ) cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS% From f97daa20c184fb4b68d7ff5de1172a8e94e38b2f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 17:23:02 -0700 Subject: [PATCH 02/10] Fix Windows spark shell stdin issue It turns out that java.lang.Process reads directly from the parent process' stdin on Windows. This means we should avoid spawning a thread that also attempts to redirect System.in to the subprocess (in vain) and contends with the subprocess in reading System.in. This raises an issue with knowing when to terminate the JVM in the PySpark shell, however, where Java itself is a python subprocess. We previously relied on the Java process killing itself on broken pipe, but this mechanism is not available on Windows since we no longer read from System.in for the EOF. Instead, in this environment we rely on python's shutdown hook to kill the child process. --- .../SparkSubmitDriverBootstrapper.scala | 19 +++++++++++++------ python/pyspark/java_gateway.py | 8 ++++++++ 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index af607e6a4a06..c4ab1beae83d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper { val process = builder.start() // Redirect stdin, stdout, and stderr to/from the child JVM - val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") - stdinThread.start() stdoutThread.start() stderrThread.start() - // Terminate on broken pipe, which signals that the parent process has exited. This is - // important for the PySpark shell, where Spark submit itself is a python subprocess. - stdinThread.join() - process.destroy() + // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning + // a thread that also reads from stdin and contends with the subprocess. + if (Utils.isWindows) { + // For the PySpark shell, the termination of this process is handled in java_gateway.py + process.waitFor() + } else { + // Terminate on broken pipe, which signals that the parent process has exited. This is + // important for the PySpark shell, where Spark submit itself is a python subprocess. + val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") + stdinThread.start() + stdinThread.join() + process.destroy() + } } } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6f4f62f23bc4..0fa9a560c830 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -15,6 +15,7 @@ # limitations under the License. # +import atexit import os import sys import signal @@ -69,6 +70,13 @@ def preexec_func(): error_msg += "--------------------------------------------------------------\n" raise Exception(error_msg) + # Ensure the Java subprocess does not linger after python exists. Note that this is best + # effort and intended mainly for Windows. In UNIX-based systems, the child process can kill + # itself on broken pipe (i.e. when the parent process' stdin sends an EOF). In Windows, + # however, this is not possible because java.lang.Process reads directly from the parent + # process' stdin, contending with any opportunity to read an EOF from the parent. + atexit.register(lambda: proc.kill()) + # Create a thread to echo output from the GatewayServer, which is required # for Java log output to show up: class EchoOutputThread(Thread): From 35caecc899796da1ad4851185644ff591d479270 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 18:19:40 -0700 Subject: [PATCH 03/10] In Windows, actually kill Java processes on exit Previously we only killed the surface-level "spark-submit.cmd" command. We need to go all the way and kill its children too. --- python/pyspark/java_gateway.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 0fa9a560c830..b4873ae98e4c 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -70,12 +70,21 @@ def preexec_func(): error_msg += "--------------------------------------------------------------\n" raise Exception(error_msg) - # Ensure the Java subprocess does not linger after python exists. Note that this is best - # effort and intended mainly for Windows. In UNIX-based systems, the child process can kill - # itself on broken pipe (i.e. when the parent process' stdin sends an EOF). In Windows, - # however, this is not possible because java.lang.Process reads directly from the parent - # process' stdin, contending with any opportunity to read an EOF from the parent. - atexit.register(lambda: proc.kill()) + # Ensure the Java child processes do not linger after python has exited in Windows. + # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when + # the parent process' stdin sends an EOF). In Windows, however, this is not possible + # because java.lang.Process reads directly from the parent process' stdin, contending + # with any opportunity to read an EOF from the parent. Note that this is only best + # effort and will not take effect if the python process is violently terminated. + if on_windows: + # In Windows, the child process here is "spark-submit.cmd", not the JVM itself + # (because the UNIX "exec" command is not available). This means we cannot simply + # call proc.kill(), which kills only the "spark-submit.cmd" process but not the + # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all + # child processes. + def killChild(): + Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) + atexit.register(killChild) # Create a thread to echo output from the GatewayServer, which is required # for Java log output to show up: From eeb34a034e07682232f6b7f357233f737613be2f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 18:24:50 -0700 Subject: [PATCH 04/10] Update outdated comment (minor) --- bin/spark-submit | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/spark-submit b/bin/spark-submit index 32c911cd0438..277c4ce571ca 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -17,7 +17,7 @@ # limitations under the License. # -# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala! +# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala! export SPARK_HOME="$(cd `dirname $0`/..; pwd)" ORIG_ARGS=("$@") From 803218bd6795ab09d32049e79ea1c11a897a21c7 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 20:18:54 -0700 Subject: [PATCH 05/10] Actually respect SPARK_*_CLASSPATH This was simply missing in the existing code. --- bin/compute-classpath.cmd | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd index 58710cd1bd54..5ad52452a5c9 100644 --- a/bin/compute-classpath.cmd +++ b/bin/compute-classpath.cmd @@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" rem Build up classpath -set CLASSPATH=%FWDIR%conf +set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf + if exist "%FWDIR%RELEASE" ( for %%d in ("%FWDIR%lib\spark-assembly*.jar") do ( set ASSEMBLY_JAR=%%d From 72004c2b6bd502244cb1e9e055d26bd1f064ce5a Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 20:34:46 -0700 Subject: [PATCH 06/10] Actually respect --driver-java-options If you `set` a variable if a conditional, you have to use !VAR! instead of %VAR% after enabling delayed expansion. Don't ask. --- bin/spark-class2.cmd | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd index 3d97caa8de17..b8c7799e507d 100644 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -75,9 +75,9 @@ rem The repl also uses SPARK_REPL_OPTS. ) else if "%1"=="org.apache.spark.deploy.SparkSubmit" ( set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS% if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" ( - set OUR_JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH% + set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH% ) else if not "x%SPARK_LIBRARY_PATH%"=="x" ( - set OUR_JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% + set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH% ) if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY% From afcffead52e508808c12eb32f281c2ecb0800480 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 20:44:22 -0700 Subject: [PATCH 07/10] Fix style (minor) --- python/pyspark/java_gateway.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index b4873ae98e4c..19c2cb85aeb2 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -77,14 +77,14 @@ def preexec_func(): # with any opportunity to read an EOF from the parent. Note that this is only best # effort and will not take effect if the python process is violently terminated. if on_windows: - # In Windows, the child process here is "spark-submit.cmd", not the JVM itself - # (because the UNIX "exec" command is not available). This means we cannot simply - # call proc.kill(), which kills only the "spark-submit.cmd" process but not the - # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all - # child processes. - def killChild(): - Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) - atexit.register(killChild) + # In Windows, the child process here is "spark-submit.cmd", not the JVM itself + # (because the UNIX "exec" command is not available). This means we cannot simply + # call proc.kill(), which kills only the "spark-submit.cmd" process but not the + # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all + # child processes. + def killChild(): + Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) + atexit.register(killChild) # Create a thread to echo output from the GatewayServer, which is required # for Java log output to show up: From 22b1acd32a44c477548108daa2f1328499b98556 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 20:52:47 -0700 Subject: [PATCH 08/10] Fix style again (minor) --- python/pyspark/java_gateway.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 19c2cb85aeb2..afcc4118bcb0 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -83,7 +83,7 @@ def preexec_func(): # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all # child processes. def killChild(): - Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) + Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) atexit.register(killChild) # Create a thread to echo output from the GatewayServer, which is required From 92e6047a5bb77d1b018456ea3d62c0805e801953 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 21:03:17 -0700 Subject: [PATCH 09/10] Update a few comments (minor) --- bin/spark-class2.cmd | 2 +- bin/spark-submit.cmd | 1 - .../apache/spark/deploy/SparkSubmitDriverBootstrapper.scala | 2 +- python/pyspark/java_gateway.py | 4 ++-- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd index b8c7799e507d..6c5672819172 100644 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -127,7 +127,7 @@ if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself. rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching -rem the driver JVM itself. Instead of handling this complexity in Bash, we launch a separate JVM +rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM rem to prepare the launch environment of this driver JVM. rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own. diff --git a/bin/spark-submit.cmd b/bin/spark-submit.cmd index 8f79b0837130..cf6046d1547a 100644 --- a/bin/spark-submit.cmd +++ b/bin/spark-submit.cmd @@ -29,7 +29,6 @@ set SPARK_SUBMIT_DRIVER_MEMORY= set SPARK_SUBMIT_LIBRARY_PATH= set SPARK_SUBMIT_CLASSPATH= set SPARK_SUBMIT_OPTS= -set SPARK_DRIVER_MEMORY= set SPARK_SUBMIT_BOOTSTRAP_DRIVER= :loop diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index c4ab1beae83d..7ca96ed57c2d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -139,7 +139,7 @@ private[spark] object SparkSubmitDriverBootstrapper { stderrThread.start() // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning - // a thread that also reads from stdin and contends with the subprocess. + // a thread that contends with the subprocess in reading from System.in. if (Utils.isWindows) { // For the PySpark shell, the termination of this process is handled in java_gateway.py process.waitFor() diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index afcc4118bcb0..d0a4d562e14e 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -70,7 +70,7 @@ def preexec_func(): error_msg += "--------------------------------------------------------------\n" raise Exception(error_msg) - # Ensure the Java child processes do not linger after python has exited in Windows. + # In Windows, ensure the Java child processes do not linger after Python has exited. # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when # the parent process' stdin sends an EOF). In Windows, however, this is not possible # because java.lang.Process reads directly from the parent process' stdin, contending @@ -81,7 +81,7 @@ def preexec_func(): # (because the UNIX "exec" command is not available). This means we cannot simply # call proc.kill(), which kills only the "spark-submit.cmd" process but not the # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all - # child processes. + # child processes in the tree. def killChild(): Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) atexit.register(killChild) From 881a8f0d03046bf074776a8e6c820a99fad02d11 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Tue, 26 Aug 2014 22:40:43 -0700 Subject: [PATCH 10/10] Add reference to Windows taskkill --- python/pyspark/java_gateway.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index d0a4d562e14e..9c70fa5c16d0 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -81,7 +81,7 @@ def preexec_func(): # (because the UNIX "exec" command is not available). This means we cannot simply # call proc.kill(), which kills only the "spark-submit.cmd" process but not the # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all - # child processes in the tree. + # child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx) def killChild(): Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) atexit.register(killChild)