From e03492b1d0400f65819e63c9208dc4ce285f29db Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 22 Aug 2017 11:00:07 -0700 Subject: [PATCH] Support executor java options. --- .../KubernetesClusterSchedulerBackend.scala | 16 ++++++--- .../src/main/docker/executor-py/Dockerfile | 9 ++--- .../src/main/docker/executor/Dockerfile | 9 ++--- .../jobs/JavaOptionsTest.scala | 34 ++++++++++++++++--- .../integrationtest/KubernetesSuite.scala | 16 +++++++-- 5 files changed, 64 insertions(+), 20 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 759914d1ad9cf..df85ab823715a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -455,7 +455,16 @@ private[spark] class KubernetesClusterSchedulerBackend( .withValue(cp) .build() } - val requiredEnv = (Seq( + val executorExtraJavaOptionsEnv = conf + .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) + .map { opts => + val delimitedOpts = Utils.splitCommandString(opts) + delimitedOpts.zipWithIndex.map { + case (opt, index) => + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + } + }.getOrElse(Seq.empty[EnvVar]) + val executorEnv = (Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), // Executor backend expects integral value for executor cores, so round it up to an int. @@ -475,7 +484,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withNewFieldRef("v1", "status.podIP") .build()) .build() - ) + ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq val requiredPorts = Seq( (EXECUTOR_PORT_NAME, executorPort), (BLOCK_MANAGER_PORT_NAME, blockmanagerPort)) @@ -495,8 +504,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) .endResources() - .addAllToEnv(requiredEnv.asJava) - .addToEnv(executorExtraClasspathEnv.toSeq: _*) + .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .build() diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile index 2b6aae6d7a3f2..a8bb5b362ab52 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -38,10 +38,11 @@ ENV PYSPARK_PYTHON python ENV PYSPARK_DRIVER_PYTHON python ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} -# TODO support spark.executor.extraClassPath CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ + readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ - if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ - if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile index e0d9b8245ecfc..ab9f67e95a8e5 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor/Dockerfile @@ -23,11 +23,12 @@ FROM spark-base COPY examples /opt/spark/examples -# TODO support spark.executor.extraClassPath CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ + readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ - if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ - if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ - ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ + ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala index 114f8ec0408fa..7d457fc044227 100644 --- a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/JavaOptionsTest.scala @@ -29,12 +29,13 @@ private[spark] object JavaOptionsTest { def main(args: Array[String]): Unit = { // scalastyle:off println - if (args.length != 1) { + if (args.length != 2) { println(s"Invalid arguments: ${args.mkString(",")}." + - s"Usage: JavaOptionsTest ") + s"Usage: JavaOptionsTest ") System.exit(1) } val expectedDriverJavaOptions = loadPropertiesFromFile(args(0)) + val expectedExecutorJavaOptions = loadPropertiesFromFile(args(1)) val nonMatchingDriverOptions = expectedDriverJavaOptions.filter { case (optKey, optValue) => System.getProperty(optKey) != optValue } @@ -42,15 +43,38 @@ private[spark] object JavaOptionsTest { println(s"The driver's JVM options did not match. Expected $expectedDriverJavaOptions." + s" But these options did not match: $nonMatchingDriverOptions.") val sysProps = Maps.fromProperties(System.getProperties).asScala - println("System properties are:") + println("Driver system properties are:") for (prop <- sysProps) { println(s"Key: ${prop._1}, Value: ${prop._2}") } System.exit(1) } - // TODO support spark.executor.extraJavaOptions and test here. - println(s"All expected JVM options were present on the driver and executors.") + val spark = SparkSession.builder().getOrCreate().sparkContext + val nonMatchingExecutorOptions = try { + spark.parallelize(Seq(0)).flatMap { _ => + expectedExecutorJavaOptions.filter { + case (optKey, optValue) => System.getProperty(optKey) != optValue + } + }.collectAsMap() + } finally { + spark.stop() + } + if (nonMatchingExecutorOptions.nonEmpty) { + val executorSysProps = spark.parallelize(Seq(0)).flatMap { _ => + Maps.fromProperties(System.getProperties).asScala + }.collectAsMap() + println(s"The executor's JVM options did not match. Expected" + + s" $expectedExecutorJavaOptions. But these options did not" + + s" match: $nonMatchingExecutorOptions.") + println("Executor system properties are:") + for (prop <- executorSysProps) { + println(s"Key: ${prop._1}, Value: ${prop._2}") + } + System.exit(1) + } else { + println("All expected JVM options were present on the driver and executors.") + } // scalastyle:on println } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 9c1f9775681e1..e204d0173aff8 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -234,18 +234,28 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { launchStagingServer(SSLOptions(), None) val driverJvmOptionsFile = storeJvmOptionsInTempFile( Map("simpleDriverConf" -> "simpleDriverConfValue", - "driverconfwithspaces" -> "driver conf with spaces value"), + "driverconfwithspaces" -> "driver conf with spaces value"), "driver-jvm-options.properties", "JVM options that should be set on the driver.") + val executorJvmOptionsFile = storeJvmOptionsInTempFile( + Map("simpleExecutorConf" -> "simpleExecutorConfValue", + "executor conf with spaces" -> "executor conf with spaces value"), + "executor-jvm-options.properties", + "JVM options that should be set on the executors.") sparkConf.set(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-DsimpleDriverConf=simpleDriverConfValue" + " -Ddriverconfwithspaces='driver conf with spaces value'") - sparkConf.set("spark.files", driverJvmOptionsFile.getAbsolutePath) + sparkConf.set(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, + "-DsimpleExecutorConf=simpleExecutorConfValue" + + " -D\'executor conf with spaces\'=\'executor conf with spaces value\'") + sparkConf.set("spark.files", + Seq(driverJvmOptionsFile.getAbsolutePath, executorJvmOptionsFile.getAbsolutePath) + .mkString(",")) runSparkApplicationAndVerifyCompletion( JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE), JAVA_OPTIONS_MAIN_CLASS, Seq(s"All expected JVM options were present on the driver and executors."), - Array(driverJvmOptionsFile.getName), + Array(driverJvmOptionsFile.getName, executorJvmOptionsFile.getName), Seq.empty[String]) }