Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"

rem Build up classpath
set CLASSPATH=%FWDIR%conf
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf

if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
set ASSEMBLY_JAR=%%d
Expand Down
46 changes: 39 additions & 7 deletions bin/spark-class2.cmd
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala!

setlocal enabledelayedexpansion

set SCALA_VERSION=2.10
Expand All @@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given

if not "x%SPARK_MEM%"=="x" (
echo Warning: SPARK_MEM is deprecated, please use a more specific config option
echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY.
echo e.g., spark.executor.memory or spark.driver.memory.
)

rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
Expand Down Expand Up @@ -67,10 +69,18 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS%
if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY%

rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.repl.Main" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS%
rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY.
rem The repl also uses SPARK_REPL_OPTS.
) else if "%1"=="org.apache.spark.deploy.SparkSubmit" (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS%
if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" (
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH%
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The !VAR_NAME! syntax is explained here: andrewor14@72004c2. If we used %VAR_NAME%, this wouldn't pick up the latest value set in L76 because we're inside a conditional.

) else if not "x%SPARK_LIBRARY_PATH%"=="x" (
set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH%
)
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY%
) else (
set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS%
if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY%
Expand All @@ -80,9 +90,9 @@ rem Set JAVA_OPTS to be able to load native libraries and to set heap size
for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i
for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i
if "%jversion%" geq "1.8.0" (
set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
) else (
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM%
)
rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!

Expand Down Expand Up @@ -115,5 +125,27 @@ rem Figure out where java is.
set RUNNER=java
if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java

"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself.
rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching
rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM
rem to prepare the launch environment of this driver JVM.

rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own.
rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must
rem be done here because the Windows "shift" command does not work in a conditional block.
set BOOTSTRAP_ARGS=
shift
:start_parse
if "%~1" == "" goto end_parse
set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1
shift
goto start_parse
:end_parse

if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] (
set SPARK_CLASS=1
"%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS%
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not working yet. See commit message andrewor14@83ebe60 for more detail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fixed in this commit andrewor14@f97daa2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JK, this was actually fixed in andrewor14@35caecc

) else (
"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %*
)
:exit
2 changes: 1 addition & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala!
# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!

export SPARK_HOME="$(cd `dirname $0`/..; pwd)"
ORIG_ARGS=("$@")
Expand Down
34 changes: 23 additions & 11 deletions bin/spark-submit.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,28 @@ rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala!

set SPARK_HOME=%~dp0..
set ORIG_ARGS=%*

rem Clear the values of all variables used
set DEPLOY_MODE=
set DRIVER_MEMORY=
rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf
set SPARK_SUBMIT_DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
set SPARK_SUBMIT_OPTS=
set SPARK_DRIVER_MEMORY=
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=

:loop
if [%1] == [] goto continue
if [%1] == [--deploy-mode] (
set DEPLOY_MODE=%2
set SPARK_SUBMIT_DEPLOY_MODE=%2
) else if [%1] == [--properties-file] (
set SPARK_SUBMIT_PROPERTIES_FILE=%2
) else if [%1] == [--driver-memory] (
set DRIVER_MEMORY=%2
set SPARK_SUBMIT_DRIVER_MEMORY=%2
) else if [%1] == [--driver-library-path] (
set SPARK_SUBMIT_LIBRARY_PATH=%2
) else if [%1] == [--driver-class-path] (
Expand All @@ -45,12 +50,19 @@ if [%1] == [] goto continue
goto loop
:continue

if [%DEPLOY_MODE%] == [] (
set DEPLOY_MODE=client
)
rem For client mode, the driver will be launched in the same JVM that launches
rem SparkSubmit, so we may need to read the properties file for any extra class
rem paths, library paths, java options and memory early on. Otherwise, it will
rem be too late by the time the driver JVM has started.

if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] (
set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY%
if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] (
if exist %SPARK_SUBMIT_PROPERTIES_FILE% (
rem Parse the properties file only if the special configs exist
for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^
%SPARK_SUBMIT_PROPERTIES_FILE%') do (
set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1
)
)
)

cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS%
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper {
val process = builder.start()

// Redirect stdin, stdout, and stderr to/from the child JVM
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout")
val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr")
stdinThread.start()
stdoutThread.start()
stderrThread.start()

// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
stdinThread.join()
process.destroy()
// In Windows, the subprocess reads directly from our stdin, so we should avoid spawning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any public location where this behavior is specified (i.e. for a developer doc?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that I'm aware of :/

// a thread that contends with the subprocess in reading from System.in.
if (Utils.isWindows) {
// For the PySpark shell, the termination of this process is handled in java_gateway.py
process.waitFor()
} else {
// Terminate on broken pipe, which signals that the parent process has exited. This is
// important for the PySpark shell, where Spark submit itself is a python subprocess.
val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
stdinThread.start()
stdinThread.join()
process.destroy()
}
}

}
17 changes: 17 additions & 0 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
#

import atexit
import os
import sys
import signal
Expand Down Expand Up @@ -69,6 +70,22 @@ def preexec_func():
error_msg += "--------------------------------------------------------------\n"
raise Exception(error_msg)

# In Windows, ensure the Java child processes do not linger after Python has exited.
# In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when
# the parent process' stdin sends an EOF). In Windows, however, this is not possible
# because java.lang.Process reads directly from the parent process' stdin, contending
# with any opportunity to read an EOF from the parent. Note that this is only best
# effort and will not take effect if the python process is violently terminated.
if on_windows:
# In Windows, the child process here is "spark-submit.cmd", not the JVM itself
# (because the UNIX "exec" command is not available). This means we cannot simply
# call proc.kill(), which kills only the "spark-submit.cmd" process but not the
# JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all
# child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx)
def killChild():
Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would link to wherever you found this - maybe here?
http://stackoverflow.com/questions/1230669/subprocess-deleting-child-processes-in-windows

Also, this is a bit scary to just issue kill commands (let's hope proc.pid is correct!) but it appears to be the best way to do this ATM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. If we are to link it I'd rather provide a more official one, say http://technet.microsoft.com/en-us/library/bb491009.aspx

atexit.register(killChild)

# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):
Expand Down