Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 9d8e981

Browse files
committed
Support executor java options.
1 parent f7b5820 commit 9d8e981

File tree

6 files changed

+66
-25
lines changed

6 files changed

+66
-25
lines changed

resource-managers/kubernetes/README.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,11 @@ important matters to keep in mind when developing this feature.
1414

1515
# Building Spark with Kubernetes Support
1616

17-
To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile the Kubernetes core implementation module along with its dependencies:
17+
To build Spark with Kubernetes support, use the `kubernetes` profile when invoking Maven. For example, to simply compile
18+
the Kubernetes core implementation module along with its dependencies:
1819

1920
build/mvn compile -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
2021

21-
If this is the first time you compile the Kubernetes core implementation module, run the following command to install the dependencies and compile:
22-
23-
build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
24-
2522
To build a distribution of Spark with Kubernetes support, use the `dev/make-distribution.sh` script, and add the
2623
`kubernetes` profile as part of the build arguments. Any other build arguments can be specified as one would expect when
2724
building Spark normally. For example, to build Spark against Hadoop 2.7 and Kubernetes:

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
455455
.withValue(cp)
456456
.build()
457457
}
458-
val requiredEnv = (Seq(
458+
val executorExtraJavaOptionsEnv = conf
459+
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
460+
.map { opts =>
461+
val delimitedOpts = Utils.splitCommandString(opts)
462+
delimitedOpts.zipWithIndex.map {
463+
case (opt, index) =>
464+
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
465+
}
466+
}.getOrElse(Seq.empty[EnvVar])
467+
val executorEnv = (Seq(
459468
(ENV_EXECUTOR_PORT, executorPort.toString),
460469
(ENV_DRIVER_URL, driverUrl),
461470
// Executor backend expects integral value for executor cores, so round it up to an int.
@@ -475,7 +484,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
475484
.withNewFieldRef("v1", "status.podIP")
476485
.build())
477486
.build()
478-
)
487+
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
479488
val requiredPorts = Seq(
480489
(EXECUTOR_PORT_NAME, executorPort),
481490
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
@@ -495,8 +504,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
495504
.addToLimits("memory", executorMemoryLimitQuantity)
496505
.addToRequests("cpu", executorCpuQuantity)
497506
.endResources()
498-
.addAllToEnv(requiredEnv.asJava)
499-
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
507+
.addAllToEnv(executorEnv.asJava)
500508
.withPorts(requiredPorts.asJava)
501509
.build()
502510

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,11 @@ ENV PYSPARK_PYTHON python
3838
ENV PYSPARK_DRIVER_PYTHON python
3939
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
4040

41-
# TODO support spark.executor.extraClassPath
4241
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
42+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
43+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
4344
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
4445
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
45-
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
46-
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
47-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
46+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
47+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
48+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ FROM spark-base
2323

2424
COPY examples /opt/spark/examples
2525

26-
# TODO support spark.executor.extraClassPath
2726
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
27+
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
28+
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
2829
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
2930
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
3031
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
31-
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
32-
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
33-
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
32+
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
33+
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
34+
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,28 +29,52 @@ private[spark] object JavaOptionsTest {
2929

3030
def main(args: Array[String]): Unit = {
3131
// scalastyle:off println
32-
if (args.length != 1) {
32+
if (args.length != 2) {
3333
println(s"Invalid arguments: ${args.mkString(",")}." +
34-
s"Usage: JavaOptionsTest <driver-java-options-list-file>")
34+
s"Usage: JavaOptionsTest <driver-java-options-list-file> <executor-java-options-list-file>")
3535
System.exit(1)
3636
}
3737
val expectedDriverJavaOptions = loadPropertiesFromFile(args(0))
38+
val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1))
3839
val nonMatchingDriverOptions = expectedDriverJavaOptions.filter {
3940
case (optKey, optValue) => System.getProperty(optKey) != optValue
4041
}
4142
if (nonMatchingDriverOptions.nonEmpty) {
4243
println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." +
4344
s" But these options did not match: $nonMatchingDriverOptions.")
4445
val sysProps = Maps.fromProperties(System.getProperties).asScala
45-
println("System properties are:")
46+
println("Driver system properties are:")
4647
for (prop <- sysProps) {
4748
println(s"Key: ${prop._1}, Value: ${prop._2}")
4849
}
4950
System.exit(1)
5051
}
5152

52-
// TODO support spark.executor.extraJavaOptions and test here.
53-
println(s"All expected JVM options were present on the driver and executors.")
53+
val spark = SparkSession.builder().getOrCreate().sparkContext
54+
val nonMatchingExecutorOptions = try {
55+
spark.parallelize(Seq(0)).flatMap { _ =>
56+
expectedExecutorJavaOptions.filter {
57+
case (optKey, optValue) => System.getProperty(optKey) != optValue
58+
}
59+
}.collectAsMap()
60+
} finally {
61+
spark.stop()
62+
}
63+
if (nonMatchingExecutorOptions.nonEmpty) {
64+
val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ =>
65+
Maps.fromProperties(System.getProperties).asScala
66+
}.collectAsMap()
67+
println(s"The executor's JVM options did not match. Expected" +
68+
s" $expectedExecutorJavaOptions. But these options did not" +
69+
s" match: $nonMatchingExecutorOptions.")
70+
println("Executor system properties are:")
71+
for (prop <- executorSysProps) {
72+
println(s"Key: ${prop._1}, Value: ${prop._2}")
73+
}
74+
System.exit(1)
75+
} else {
76+
println("All expected JVM options were present on the driver and executors.")
77+
}
5478
// scalastyle:on println
5579
}
5680

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
234234
launchStagingServer(SSLOptions(), None)
235235
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
236236
Map("simpleDriverConf" -> "simpleDriverConfValue",
237-
"driverconfwithspaces" -> "driver conf with spaces value"),
237+
"driverconfwithspaces" -> "driver conf with spaces value"),
238238
"driver-jvm-options.properties",
239239
"JVM options that should be set on the driver.")
240+
val executorJvmOptionsFile = storeJvmOptionsInTempFile(
241+
Map("simpleExecutorConf" -> "simpleExecutorConfValue",
242+
"executor conf with spaces" -> "executor conf with spaces value"),
243+
"executor-jvm-options.properties",
244+
"JVM options that should be set on the executors.")
240245
sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
241246
"-DsimpleDriverConf=simpleDriverConfValue" +
242247
" -Ddriverconfwithspaces='driver conf with spaces value'")
243-
sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath)
248+
sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
249+
"-DsimpleExecutorConf=simpleExecutorConfValue" +
250+
" -D\'executor conf with spaces\'=\'executor conf with spaces value\'")
251+
sparkConf.set("spark.files",
252+
Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath)
253+
.mkString(","))
244254
runSparkApplicationAndVerifyCompletion(
245255
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
246256
JAVA_OPTIONS_MAIN_CLASS,
247257
Seq(s"All expected JVM options were present on the driver and executors."),
248-
Array(driverJvmOptionsFile.getName),
258+
Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName),
249259
Seq.empty[String])
250260
}
251261

0 commit comments

Comments
 (0)