Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withValue(cp)
.build()
}
val requiredEnv = (Seq(
val executorExtraJavaOptionsEnv = conf
.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS)
.map { opts =>
val delimitedOpts = Utils.splitCommandString(opts)
delimitedOpts.zipWithIndex.map {
case (opt, index) =>
new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build()
}
}.getOrElse(Seq.empty[EnvVar])
val executorEnv = (Seq(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
// Executor backend expects integral value for executor cores, so round it up to an int.
Expand All @@ -475,7 +484,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withNewFieldRef("v1", "status.podIP")
.build())
.build()
)
) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq
val requiredPorts = Seq(
(EXECUTOR_PORT_NAME, executorPort),
(BLOCK_MANAGER_PORT_NAME, blockmanagerPort))
Expand All @@ -495,8 +504,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
.endResources()
.addAllToEnv(requiredEnv.asJava)
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
.addAllToEnv(executorEnv.asJava)
.withPorts(requiredPorts.asJava)
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}

# TODO support spark.executor.extraClassPath
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ FROM spark-base

COPY examples /opt/spark/examples

# TODO support spark.executor.extraClassPath
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,52 @@ private[spark] object JavaOptionsTest {

def main(args: Array[String]): Unit = {
// scalastyle:off println
if (args.length != 1) {
if (args.length != 2) {
println(s"Invalid arguments: ${args.mkString(",")}." +
s"Usage: JavaOptionsTest <driver-java-options-list-file>")
s"Usage: JavaOptionsTest <driver-java-options-list-file> <executor-java-options-list-file>")
System.exit(1)
}
val expectedDriverJavaOptions = loadPropertiesFromFile(args(0))
val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1))
val nonMatchingDriverOptions = expectedDriverJavaOptions.filter {
case (optKey, optValue) => System.getProperty(optKey) != optValue
}
if (nonMatchingDriverOptions.nonEmpty) {
println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." +
s" But these options did not match: $nonMatchingDriverOptions.")
val sysProps = Maps.fromProperties(System.getProperties).asScala
println("System properties are:")
println("Driver system properties are:")
for (prop <- sysProps) {
println(s"Key: ${prop._1}, Value: ${prop._2}")
}
System.exit(1)
}

// TODO support spark.executor.extraJavaOptions and test here.
println(s"All expected JVM options were present on the driver and executors.")
val spark = SparkSession.builder().getOrCreate().sparkContext
val nonMatchingExecutorOptions = try {
spark.parallelize(Seq(0)).flatMap { _ =>
expectedExecutorJavaOptions.filter {
case (optKey, optValue) => System.getProperty(optKey) != optValue
}
}.collectAsMap()
} finally {
spark.stop()
}
if (nonMatchingExecutorOptions.nonEmpty) {
val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ =>
Maps.fromProperties(System.getProperties).asScala
}.collectAsMap()
println(s"The executor's JVM options did not match. Expected" +
s" $expectedExecutorJavaOptions. But these options did not" +
s" match: $nonMatchingExecutorOptions.")
println("Executor system properties are:")
for (prop <- executorSysProps) {
println(s"Key: ${prop._1}, Value: ${prop._2}")
}
System.exit(1)
} else {
println("All expected JVM options were present on the driver and executors.")
}
// scalastyle:on println
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
launchStagingServer(SSLOptions(), None)
val driverJvmOptionsFile = storeJvmOptionsInTempFile(
Map("simpleDriverConf" -> "simpleDriverConfValue",
"driverconfwithspaces" -> "driver conf with spaces value"),
"driverconfwithspaces" -> "driver conf with spaces value"),
"driver-jvm-options.properties",
"JVM options that should be set on the driver.")
val executorJvmOptionsFile = storeJvmOptionsInTempFile(
Map("simpleExecutorConf" -> "simpleExecutorConfValue",
"executor conf with spaces" -> "executor conf with spaces value"),
"executor-jvm-options.properties",
"JVM options that should be set on the executors.")
sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
"-DsimpleDriverConf=simpleDriverConfValue" +
" -Ddriverconfwithspaces='driver conf with spaces value'")
sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath)
sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS,
"-DsimpleExecutorConf=simpleExecutorConfValue" +
" -D\'executor conf with spaces\'=\'executor conf with spaces value\'")
sparkConf.set("spark.files",
Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath)
.mkString(","))
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
JAVA_OPTIONS_MAIN_CLASS,
Seq(s"All expected JVM options were present on the driver and executors."),
Array(driverJvmOptionsFile.getName),
Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName),
Seq.empty[String])
}

Expand Down