Skip to content

Commit 00b9dfe

Browse files
committed
[SPARK-3167] Handle special driver configs in Windows
This is an effort to bring the Windows scripts up to speed after recent splashing changes in #1845. Author: Andrew Or <[email protected]> Closes #2129 from andrewor14/windows-config and squashes the following commits: 881a8f0 [Andrew Or] Add reference to Windows taskkill 92e6047 [Andrew Or] Update a few comments (minor) 22b1acd [Andrew Or] Fix style again (minor) afcffea [Andrew Or] Fix style (minor) 72004c2 [Andrew Or] Actually respect --driver-java-options 803218b [Andrew Or] Actually respect SPARK_*_CLASSPATH eeb34a0 [Andrew Or] Update outdated comment (minor) 35caecc [Andrew Or] In Windows, actually kill Java processes on exit f97daa2 [Andrew Or] Fix Windows spark shell stdin issue 83ebe60 [Andrew Or] Parse special driver configs in Windows (broken) Conflicts: bin/spark-class2.cmd
1 parent 2381e90 commit 00b9dfe

File tree

6 files changed

+95
-26
lines changed

6 files changed

+95
-26
lines changed

bin/compute-classpath.cmd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
3636
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
3737

3838
rem Build up classpath
39-
set CLASSPATH=%FWDIR%conf
39+
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
40+
4041
if exist "%FWDIR%RELEASE" (
4142
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
4243
set ASSEMBLY_JAR=%%d

bin/spark-class2.cmd

100755100644
Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and
1717
rem limitations under the License.
1818
rem
1919

20+
rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!
21+
2022
setlocal enabledelayedexpansion
2123

2224
set SCALA_VERSION=2.10
@@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given
3840

3941
if not "x%SPARK_MEM%"=="x" (
4042
echo Warning: SPARK_MEM is deprecated, please use a more specific config option
41-
echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
43+
echo e.g., spark.executor.memory or spark.driver.memory.
4244
)
4345

4446
rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
@@ -67,18 +69,26 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
6769
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
6870
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%
6971

70-
rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
71-
) else if "%1"=="org.apache.spark.repl.Main" (
72-
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
72+
rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
73+
rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
74+
rem The repl also uses SPARK_REPL_OPTS.
75+
) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
76+
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
77+
if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
78+
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
79+
) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
80+
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
81+
)
7382
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
83+
if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
7484
) else (
7585
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
7686
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
7787
)
7888

79-
rem Set JAVA_OPTS to be able to load native libraries and to set heap size
80-
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
8189
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
90+
rem Set JAVA_OPTS to be able to load native libraries and to set heap size
91+
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
8292

8393
rem Test whether the user has built Spark
8494
if exist "%FWDIR%RELEASE" goto skip_build_test
@@ -109,5 +119,27 @@ rem Figure out where java is.
109119
set RUNNER=java
110120
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
111121

112-
"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
122+
rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
123+
rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
124+
rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
125+
rem to prepare the launch environment of this driver JVM.
126+
127+
rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
128+
rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
129+
rem be done here because the Windows "shift" command does not work in a conditional block.
130+
set BOOTSTRAP_ARGS=
131+
shift
132+
:start_parse
133+
if "%~1" == "" goto end_parse
134+
set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
135+
shift
136+
goto start_parse
137+
:end_parse
138+
139+
if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
140+
set SPARK_CLASS=1
141+
"%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
142+
) else (
143+
"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
144+
)
113145
:exit

bin/spark-submit

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# limitations under the License.
1818
#
1919

20-
# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
20+
# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
2121

2222
export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
2323
ORIG_ARGS=("$@")

bin/spark-submit.cmd

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,28 @@ rem See the License for the specific language governing permissions and
1717
rem limitations under the License.
1818
rem
1919

20+
rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!
21+
2022
set SPARK_HOME=%~dp0..
2123
set ORIG_ARGS=%*
2224

23-
rem Clear the values of all variables used
24-
set DEPLOY_MODE=
25-
set DRIVER_MEMORY=
25+
rem Reset the values of all variables used
26+
set SPARK_SUBMIT_DEPLOY_MODE=client
27+
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
28+
set SPARK_SUBMIT_DRIVER_MEMORY=
2629
set SPARK_SUBMIT_LIBRARY_PATH=
2730
set SPARK_SUBMIT_CLASSPATH=
2831
set SPARK_SUBMIT_OPTS=
29-
set SPARK_DRIVER_MEMORY=
32+
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=
3033

3134
:loop
3235
if [%1] == [] goto continue
3336
if [%1] == [--deploy-mode] (
34-
set DEPLOY_MODE=%2
37+
set SPARK_SUBMIT_DEPLOY_MODE=%2
38+
) else if [%1] == [--properties-file] (
39+
set SPARK_SUBMIT_PROPERTIES_FILE=%2
3540
) else if [%1] == [--driver-memory] (
36-
set DRIVER_MEMORY=%2
41+
set SPARK_SUBMIT_DRIVER_MEMORY=%2
3742
) else if [%1] == [--driver-library-path] (
3843
set SPARK_SUBMIT_LIBRARY_PATH=%2
3944
) else if [%1] == [--driver-class-path] (
@@ -45,12 +50,19 @@ if [%1] == [] goto continue
4550
goto loop
4651
:continue
4752

48-
if [%DEPLOY_MODE%] == [] (
49-
set DEPLOY_MODE=client
50-
)
53+
rem For client mode, the driver will be launched in the same JVM that launches
54+
rem SparkSubmit, so we may need to read the properties file for any extra class
55+
rem paths, library paths, java options and memory early on. Otherwise, it will
56+
rem be too late by the time the driver JVM has started.
5157

52-
if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
53-
set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
58+
if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
59+
if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
60+
rem Parse the properties file only if the special configs exist
61+
for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
62+
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
63+
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
64+
)
65+
)
5466
)
5567

5668
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
133133
val process = builder.start()
134134

135135
// Redirect stdin, stdout, and stderr to/from the child JVM
136-
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
137136
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
138137
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
139-
stdinThread.start()
140138
stdoutThread.start()
141139
stderrThread.start()
142140

143-
// Terminate on broken pipe, which signals that the parent process has exited. This is
144-
// important for the PySpark shell, where Spark submit itself is a python subprocess.
145-
stdinThread.join()
146-
process.destroy()
141+
// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
142+
// a thread that contends with the subprocess in reading from System.in.
143+
if (Utils.isWindows) {
144+
// For the PySpark shell, the termination of this process is handled in java_gateway.py
145+
process.waitFor()
146+
} else {
147+
// Terminate on broken pipe, which signals that the parent process has exited. This is
148+
// important for the PySpark shell, where Spark submit itself is a python subprocess.
149+
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
150+
stdinThread.start()
151+
stdinThread.join()
152+
process.destroy()
153+
}
147154
}
148155

149156
}

python/pyspark/java_gateway.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18+
import atexit
1819
import os
1920
import sys
2021
import signal
@@ -69,6 +70,22 @@ def preexec_func():
6970
error_msg += "--------------------------------------------------------------\n"
7071
raise Exception(error_msg)
7172

73+
# In Windows, ensure the Java child processes do not linger after Python has exited.
74+
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
75+
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
76+
# because java.lang.Process reads directly from the parent process' stdin, contending
77+
# with any opportunity to read an EOF from the parent. Note that this is only best
78+
# effort and will not take effect if the python process is violently terminated.
79+
if on_windows:
80+
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
81+
# (because the UNIX "exec" command is not available). This means we cannot simply
82+
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
83+
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
84+
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
85+
def killChild():
86+
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
87+
atexit.register(killChild)
88+
7289
# Create a thread to echo output from the GatewayServer, which is required
7390
# for Java log output to show up:
7491
class EchoOutputThread(Thread):

0 commit comments

Comments
 (0)