From 8148f49e31b30edaa03973198042cb044df46cae Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 29 Oct 2018 16:28:36 -0700 Subject: [PATCH 1/7] [SPARK-25875][k8s] Merge code to set up driver command into a single step. Right now there are 3 different classes dealing with building the driver command to run inside the pod, one for each "binding" supported by Spark. This has two main shortcomings: - the code in the 3 classes is very similar; changing things in one place would probably mean making a similar change in the others. - it gives the false impression that the step implementation is the only place where binding-specific logic is needed. That is not true; there was code in KubernetesConf that was binding-specific, and there's also code in the executor-specific config step. So the 3 classes weren't really working as a language-specific abstraction. On top of that, the current code was propagating command line parameters in a different way depending on the binding. That doesn't seem necessary, and in fact using environment variables for command line parameters is in general a really bad idea, since you can't handle special characters (e.g. spaces) that way. This change merges the 3 different code paths for Java, Python and R into a single step, and also merges the 3 code paths to start the Spark driver in the k8s entry point script. This increases the amount of shared code, and also moves more feature logic into the step itself, so it doesn't live in KubernetesConf. Note that not all logic related to setting up the driver lives in that step. For example, the memory overhead calculation still lives separately, except it now happens in the driver config step instead of outside the step hierarchy altogether. Some of the noise in the diff is because of changes to KubernetesConf, which will be addressed in a separate change. Tested with new and updated unit tests + integration tests. --- .../org/apache/spark/deploy/k8s/Config.scala | 28 ---- .../apache/spark/deploy/k8s/Constants.scala | 5 +- .../spark/deploy/k8s/KubernetesConf.scala | 62 +------ .../k8s/features/BasicDriverFeatureStep.scala | 43 +++-- .../features/BasicExecutorFeatureStep.scala | 21 +-- .../features/DriverCommandFeatureStep.scala | 137 ++++++++++++++++ .../KubernetesFeatureConfigStep.scala | 4 +- .../bindings/JavaDriverFeatureStep.scala | 46 ------ .../bindings/PythonDriverFeatureStep.scala | 76 --------- .../bindings/RDriverFeatureStep.scala | 62 ------- .../k8s/submit/KubernetesDriverBuilder.scala | 26 +-- .../deploy/k8s/KubernetesConfSuite.scala | 98 ----------- .../BasicDriverFeatureStepSuite.scala | 65 ++++++-- .../BasicExecutorFeatureStepSuite.scala | 4 - .../DriverCommandFeatureStepSuite.scala | 155 ++++++++++++++++++ ...ubernetesCredentialsFeatureStepSuite.scala | 3 - .../DriverServiceFeatureStepSuite.scala | 6 - .../features/EnvSecretsFeatureStepSuite.scala | 1 - .../features/LocalDirsFeatureStepSuite.scala | 1 - .../MountSecretsFeatureStepSuite.scala | 1 - .../MountVolumesFeatureStepSuite.scala | 1 - .../bindings/JavaDriverFeatureStepSuite.scala | 61 ------- .../PythonDriverFeatureStepSuite.scala | 112 ------------- .../bindings/RDriverFeatureStepSuite.scala | 64 -------- .../spark/deploy/k8s/submit/ClientSuite.scala | 1 - .../submit/KubernetesDriverBuilderSuite.scala | 115 +------------ .../k8s/KubernetesExecutorBuilderSuite.scala | 5 - .../src/main/dockerfiles/spark/entrypoint.sh | 16 -- 28 files changed, 404 insertions(+), 815 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala delete mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala delete mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala delete mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala delete mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index fff8fa4340c35..ba1b2ff94bb1e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -125,34 +125,6 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.python.mainAppResource") - .doc("The main app resource for pyspark jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_PYSPARK_APP_ARGS = - ConfigBuilder("spark.kubernetes.python.appArgs") - .doc("The app arguments for PySpark Jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.r.mainAppResource") - .doc("The main app resource for SparkR jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_APP_ARGS = - ConfigBuilder("spark.kubernetes.r.appArgs") - .doc("The app arguments for SparkR Jobs") - .internal() - .stringConf - .createOptional - val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 172a9054bb4f2..20c57fc68640a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -69,17 +69,14 @@ private[spark] object Constants { val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // BINDINGS - val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" val ENV_PYSPARK_FILES = "PYSPARK_FILES" - val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" - val ENV_R_PRIMARY = "R_PRIMARY" - val ENV_R_ARGS = "R_APP_ARGS" // Miscellaneous val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" val MEMORY_OVERHEAD_MIN_MIB = 384L + val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d // Hadoop Configuration val HADOOP_FILE_VOLUME = "hadoop-properties" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 3e30ab2c8353e..c1a2ed6c72992 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -29,6 +29,7 @@ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.util.Utils private[spark] sealed trait KubernetesRoleSpecificConf @@ -40,7 +41,8 @@ private[spark] case class KubernetesDriverSpecificConf( mainAppResource: Option[MainAppResource], mainClass: String, appName: String, - appArgs: Seq[String]) extends KubernetesRoleSpecificConf + appArgs: Seq[String], + pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf /* * Structure containing metadata for Kubernetes logic that builds a Spark executor. @@ -71,7 +73,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleSecretEnvNamesToKeyRefs: Map[String, String], roleEnvs: Map[String, String], roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], - sparkFiles: Seq[String], hadoopConfSpec: Option[HadoopConfSpec]) { def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" @@ -83,23 +84,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) - def sparkJars(): Seq[String] = sparkConf - .getOption("spark.jars") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) - - def pyFiles(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_PY_FILES) - - def pySparkMainResource(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) - - def pySparkPythonVersion(): String = sparkConf - .get(PYSPARK_MAJOR_PYTHON_VERSION) - - def sparkRMainResource(): Option[String] = sparkConf - .get(KUBERNETES_R_MAIN_APP_RESOURCE) - def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) def imagePullSecrets(): Seq[LocalObjectReference] = { @@ -136,33 +120,6 @@ private[spark] object KubernetesConf { appArgs: Array[String], maybePyFiles: Option[String], hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { - val sparkConfWithMainAppJar = sparkConf.clone() - val additionalFiles = mutable.ArrayBuffer.empty[String] - mainAppResource.foreach { - case JavaMainAppResource(res) => - val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) - if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) - } - // The function of this outer match is to account for multiple nonJVM - // bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4 - case nonJVM: NonJVMResource => - nonJVM match { - case PythonMainAppResource(res) => - additionalFiles += res - maybePyFiles.foreach{maybePyFiles => - additionalFiles.appendAll(maybePyFiles.split(","))} - sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) - case RMainAppResource(res) => - additionalFiles += res - sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res) - } - sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) - } - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + @@ -189,11 +146,6 @@ private[spark] object KubernetesConf { KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - val sparkFiles = sparkConf - .getOption("spark.files") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) ++ additionalFiles - val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) KubernetesUtils.requireNandDefined( hadoopConfDir, @@ -206,10 +158,12 @@ private[spark] object KubernetesConf { } else { None } + val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + KubernetesConf( - sparkConfWithMainAppJar, - KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), + sparkConf.clone(), + KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles), appResourceNamePrefix, appId, driverLabels, @@ -218,7 +172,6 @@ private[spark] object KubernetesConf { driverSecretEnvNamesToKeyRefs, driverEnvs, driverVolumes, - sparkFiles, hadoopConfSpec) } @@ -275,7 +228,6 @@ private[spark] object KubernetesConf { executorEnvSecrets, executorEnv, executorVolumes, - Seq.empty[String], None) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 575bc54ffe2bb..2dd78c96a3028 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) @@ -47,10 +48,24 @@ private[spark] class BasicDriverFeatureStep( // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + + // The memory overhead factor to use. If the user has not set it, then use a different + // value for non-JVM apps. This value is propagated to executors. + private val overheadFactor = conf.roleSpecificConf.mainAppResource match { + case Some(_: NonJVMResource) => + if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) { + conf.get(MEMORY_OVERHEAD_FACTOR) + } else { + NON_JVM_MEMORY_OVERHEAD_FACTOR + } + + case _ => + conf.get(MEMORY_OVERHEAD_FACTOR) + } + private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { @@ -134,20 +149,18 @@ private[spark] class BasicDriverFeatureStep( KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, - KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true") - - val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkJars()) - val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkFiles) - if (resolvedSparkJars.nonEmpty) { - additionalProps.put("spark.jars", resolvedSparkJars.mkString(",")) - } - if (resolvedSparkFiles.nonEmpty) { - additionalProps.put("spark.files", resolvedSparkFiles.mkString(",")) + KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", + MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + + Seq("spark.jars", "spark.files").foreach { key => + conf.getOption(key).foreach { value => + val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value)) + if (resolved.nonEmpty) { + additionalProps.put(key, resolved.mkString(",")) + } + } } + additionalProps.toMap } - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index d89995ba5e4f4..fc73f8ad977b3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - private val executorMemoryTotal = kubernetesConf.sparkConf - .getOption(APP_RESOURCE_TYPE.key).map{ res => - val additionalPySparkMemory = res match { - case "python" => - kubernetesConf.sparkConf - .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) - case _ => 0 - } - executorMemoryWithOverhead + additionalPySparkMemory - }.getOrElse(executorMemoryWithOverhead) + private val executorMemoryTotal = kubernetesConf.get(APP_RESOURCE_TYPE) match { + case Some("python") => + executorMemoryWithOverhead + + kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + + case _ => executorMemoryWithOverhead + } private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) private val executorCoresRequest = @@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep( SparkPod(executorPod, containerWithLimitCores) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala new file mode 100644 index 0000000000000..5d72fac62c7ee --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder} + +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.Utils + +/** + * Creates the driver command for running the user app, and propagates needed configuration so + * executors can also find the app code. + */ +private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + + private val driverConf = conf.roleSpecificConf + + override def configurePod(pod: SparkPod): SparkPod = { + driverConf.mainAppResource match { + case Some(JavaMainAppResource(_)) | None => + configureForJava(pod) + + case Some(PythonMainAppResource(res)) => + configureForPython(pod, res) + + case Some(RMainAppResource(res)) => + configureForR(pod, res) + } + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = { + driverConf.mainAppResource match { + case Some(JavaMainAppResource(res)) => + additionalJavaProperties(res) + + case Some(PythonMainAppResource(res)) => + additionalPythonProperties(res) + + case Some(RMainAppResource(res)) => + additionalRProperties(res) + + case None => + Map.empty + } + } + + private def configureForJava(pod: SparkPod): SparkPod = { + // The user application jar is merged into the spark.jars list and managed through that + // property, so use a "blank" resource for the Java driver. + val driverContainer = baseDriverContainer(pod, SparkLauncher.NO_RESOURCE).build() + SparkPod(pod.pod, driverContainer) + } + + private def configureForPython(pod: SparkPod, res: String): SparkPod = { + val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) { + // Delineation by ":" is to append the PySpark Files to the PYTHONPATH + // of the respective PySpark pod + val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles) + Some(new EnvVarBuilder() + .withName(ENV_PYSPARK_FILES) + .withValue(resolved.mkString(":")) + .build()) + } else { + None + } + val pythonEnvs = + Seq(new EnvVarBuilder() + .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) + .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION)) + .build()) ++ + maybePythonFiles + + val pythonContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)) + .addAllToEnv(pythonEnvs.asJava) + .build() + + SparkPod(pod.pod, pythonContainer) + } + + private def configureForR(pod: SparkPod, res: String): SparkPod = { + val rContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)).build() + SparkPod(pod.pod, rContainer) + } + + private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = { + new ContainerBuilder(pod.container) + .addToArgs("driver") + .addToArgs("--properties-file", SPARK_CONF_PATH) + .addToArgs("--class", driverConf.mainClass) + .addToArgs(resource) + .addToArgs(driverConf.appArgs: _*) + } + + private def additionalJavaProperties(resource: String): Map[String, String] = { + resourceType("java") ++ mergeFileList("spark.jars", Seq(resource)) + } + + private def additionalPythonProperties(resource: String): Map[String, String] = { + resourceType("python") ++ + mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles) + } + + private def additionalRProperties(resource: String): Map[String, String] = { + resourceType("r") ++ mergeFileList("spark.files", Seq(resource)) + } + + private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = { + val existing = Utils.stringToSeq(conf.sparkConf.get(key, "")) + Map(key -> (existing ++ filesToAdd).distinct.mkString(",")) + } + + private def resourceType(resType: String): Map[String, String] = { + Map(APP_RESOURCE_TYPE.key -> resType) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 4c1be3bb13293..58cdaa3cadd6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep { /** * Return any system properties that should be set on the JVM in accordance to this feature. */ - def getAdditionalPodSystemProperties(): Map[String, String] + def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty /** * Return any additional Kubernetes resources that should be added to support this feature. Only * applicable when creating the driver in cluster mode. */ - def getAdditionalKubernetesResources(): Seq[HasMetadata] + def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala deleted file mode 100644 index 6f063b253cd73..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep -import org.apache.spark.launcher.SparkLauncher - -private[spark] class JavaDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val withDriverArgs = new ContainerBuilder(pod.container) - .addToArgs("driver") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*) - .build() - SparkPod(pod.pod, withDriverArgs) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "java") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala deleted file mode 100644 index cf0c03b22bd7e..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class PythonDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") - // Delineation is done by " " because that is input into PythonRunner - val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - pyArgs => - new EnvVarBuilder() - .withName(ENV_PYSPARK_ARGS) - .withValue(pyArgs.mkString(" ")) - .build()) - val maybePythonFiles = kubernetesConf.pyFiles().map( - // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH - // of the respective PySpark pod - pyFiles => - new EnvVarBuilder() - .withName(ENV_PYSPARK_FILES) - .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(",")) - .mkString(":")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get)) - .build(), - new EnvVarBuilder() - .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) - .withValue(kubernetesConf.pySparkPythonVersion()) - .build()) - val pythonEnvs = envSeq ++ - maybePythonArgs.toSeq ++ - maybePythonFiles.toSeq - - val withPythonPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(pythonEnvs.asJava) - .addToArgs("driver-py") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withPythonPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "python") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala deleted file mode 100644 index 1a7ef52fefe70..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class RDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined") - // Delineation is done by " " because that is input into RRunner - val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - rArgs => - new EnvVarBuilder() - .withName(ENV_R_ARGS) - .withValue(rArgs.mkString(" ")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_R_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get)) - .build()) - val rEnvs = envSeq ++ - maybeRArgs.toSeq - - val withRPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(rEnvs.asJava) - .addToArgs("driver-r") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withRPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "r") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index b0b53321abd25..7c01ab7a564ee 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf} import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -40,18 +39,10 @@ private[spark] class KubernetesDriverBuilder( provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - providePythonStep: ( + provideDriverCommandStep: ( KubernetesConf[KubernetesDriverSpecificConf] - => PythonDriverFeatureStep) = - new PythonDriverFeatureStep(_), - provideRStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => RDriverFeatureStep) = - new RDriverFeatureStep(_), - provideJavaStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_), + => DriverCommandFeatureStep) = + new DriverCommandFeatureStep(_), provideHadoopGlobalStep: ( KubernetesConf[KubernetesDriverSpecificConf] => KerberosConfDriverFeatureStep) = @@ -75,21 +66,14 @@ private[spark] class KubernetesDriverBuilder( Seq(provideVolumesStep(kubernetesConf)) } else Nil - val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { - case JavaMainAppResource(_) => - provideJavaStep(kubernetesConf) - case PythonMainAppResource(_) => - providePythonStep(kubernetesConf) - case RMainAppResource(_) => - provideRStep(kubernetesConf)} - .getOrElse(provideJavaStep(kubernetesConf)) + val driverCommandStep = provideDriverCommandStep(kubernetesConf) val maybeHadoopConfigStep = kubernetesConf.hadoopConfSpec.map { _ => provideHadoopGlobalStep(kubernetesConf)} val allFeatures: Seq[KubernetesFeatureConfigStep] = - (baseFeatures :+ bindingsStep) ++ + baseFeatures ++ Seq(driverCommandStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature ++ maybeHadoopConfigStep.toSeq diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index bb2b94f9976e2..9db98dd80dae7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -70,104 +70,6 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.roleSpecificConf.appArgs === APP_ARGS) } - test("Creating driver conf with and without the main app jar influences spark.jars") { - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar")) - val kubernetesConfWithMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppJar, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars") - .split(",") - === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar")) - val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource = None, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) - } - - test("Creating driver conf with a python primary file") { - val mainResourceFile = "local:///opt/spark/main.py" - val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example4.py") - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - Some(inputPyFiles.mkString(",")), - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) - } - - test("Creating driver conf with a r primary file") { - val mainResourceFile = "local:///opt/spark/main.R" - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example2.R") - val mainAppResource = Some(RMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example2.R", mainResourceFile)) - } - - test("Testing explicit setting of memory overhead on non-JVM tasks") { - val sparkConf = new SparkConf(false) - .set(MEMORY_OVERHEAD_FACTOR, 0.3) - - val mainResourceFile = "local:///opt/spark/main.py" - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val conf = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) - } - test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") { val sparkConf = new SparkConf(false) .set(MEMORY_OVERHEAD_FACTOR, 0.3) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index eebdd157da638..e9127ac737522 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -62,8 +62,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set("spark.driver.cores", "2") .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") - .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L) + .set(DRIVER_MEMORY.key, "256M") + .set(DRIVER_MEMORY_OVERHEAD, 200L) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) val kubernetesConf = KubernetesConf( @@ -77,7 +77,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val featureStep = new BasicDriverFeatureStep(kubernetesConf) @@ -130,16 +129,17 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, - "spark.kubernetes.submitInDriver" -> "true") + "spark.kubernetes.submitInDriver" -> "true", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver:latest") val pythonSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver-py:latest") val javaKubernetesConf = KubernetesConf( javaSparkConf, @@ -156,7 +156,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val pythonKubernetesConf = KubernetesConf( @@ -174,7 +173,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) @@ -204,7 +202,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - allFiles, hadoopConfSpec = None) val step = new BasicDriverFeatureStep(kubernetesConf) @@ -215,10 +212,54 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") + "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(additionalProperties === expectedSparkConf) } + // Memory overhead tests. Tuples are: + // test name, main resource, overhead factor, expected factor + Seq( + ("java w/o resource", None, None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("java w/ resource", Some(JavaMainAppResource(null)), None, + MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("python default", Some(PythonMainAppResource(null)), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), + ("python w/ override", Some(PythonMainAppResource(null)), Some(0.9d), 0.9d), + ("r default", Some(RMainAppResource(null)), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) + ).foreach { case (name, resource, factor, expectedFactor) => + test(s"memory overhead factor: $name") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource) + val conf = KubernetesConf( + sparkConf, + driverConf, + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty, + DRIVER_ENVS, + Nil, + hadoopConfSpec = None) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = pod.container.getResources.getRequests.get("memory").getAmount() + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + } + } + def containerPort(name: String, portNumber: Int): ContainerPort = new ContainerPortBuilder() .withName(name) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 41f34bd45cd5b..e9a16aab6ccc2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -91,7 +91,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -132,7 +131,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -154,7 +152,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map("qux" -> "quux"), Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -182,7 +179,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala new file mode 100644 index 0000000000000..95ccbeb870a56 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.util.Utils + +class DriverCommandFeatureStepSuite extends SparkFunSuite { + + private val MAIN_CLASS = "mainClass" + + test("no resource defined runs java driver") { + val spec = applyFeatureStep(appArgs = Array("5 7")) + val container = spec.pod.container + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "spark-internal", "5 7")) + assert(spec.systemProperties.isEmpty) + } + + test("java resource") { + val mainResource = "local:///main.jar" + val spec = applyFeatureStep( + resource = Some(JavaMainAppResource(mainResource)), + appArgs = Array("5", "7")) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "spark-internal", "5", "7")) + + val jars = Utils.stringToSeq(spec.systemProperties("spark.jars")) + assert(jars.toSet === Set(mainResource)) + } + + test("python resource with no extra files") { + val mainResource = "local:///main.py" + val sparkConf = new SparkConf(false) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") + + val spec = applyFeatureStep( + conf = sparkConf, + resource = Some(PythonMainAppResource(mainResource))) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py")) + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === Set(mainResource)) + } + + test("python resource with extra files") { + val expectedMainResource = "/main.py" + val expectedPySparkFiles = "/example2.py:/example3.py" + val filesInConf = Set("local:///example.py") + + val mainResource = s"local://$expectedMainResource" + val pyFiles = Seq("local:///example2.py", "local:///example3.py") + + val sparkConf = new SparkConf(false) + .set("spark.files", filesInConf.mkString(",")) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") + val spec = applyFeatureStep( + conf = sparkConf, + resource = Some(PythonMainAppResource(mainResource)), + appArgs = Array("5", "7", "9"), + pyFiles = pyFiles) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py", "5", "7", "9")) + + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + val expected = Map( + ENV_PYSPARK_FILES -> expectedPySparkFiles, + ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2") + assert(envs === expected) + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource)) + } + + test("R resource") { + val expectedMainResource = "/main.R" + val mainResource = s"local://$expectedMainResource" + + val spec = applyFeatureStep( + resource = Some(RMainAppResource(mainResource)), + appArgs = Array("5", "7", "9")) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.R", "5", "7", "9")) + } + + private def applyFeatureStep( + conf: SparkConf = new SparkConf(false), + resource: Option[MainAppResource] = None, + appArgs: Array[String] = Array(), + pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { + val driverConf = new KubernetesDriverSpecificConf( + resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles) + val kubernetesConf = KubernetesConf( + conf, + driverConf, + "resource-prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + hadoopConfSpec = None) + val step = new DriverCommandFeatureStep(kubernetesConf) + val pod = step.configurePod(SparkPod.initialPod()) + val props = step.getAdditionalPodSystemProperties() + KubernetesDriverSpec(pod, Nil, props) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 8675ceb48cf6d..36c6616a87b0a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -62,7 +62,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -95,7 +94,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) @@ -135,7 +133,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 5c3e801501513..78f5c7df19bdb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -68,7 +68,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) @@ -101,7 +100,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX @@ -124,7 +122,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val resolvedService = configurationStep .getAdditionalKubernetesResources() @@ -156,7 +153,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) val driverService = configurationStep @@ -185,7 +181,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver bind address should not be allowed.") @@ -212,7 +207,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver host address should not be allowed.") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 43796b77efdc7..3d253079c3ce7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -46,7 +46,6 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ envVarsToKeys, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new EnvSecretsFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 3a4e60547d7f2..2098ccaa5e246 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -48,7 +48,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 18e3d773f690d..1555f6a9c6527 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -44,7 +44,6 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new MountSecretsFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 0d0a5fb951f64..a87ed44d3df34 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -36,7 +36,6 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Nil, hadoopConfSpec = None) test("Mounts hostPath volumes") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala deleted file mode 100644 index 9172e0c3dc408..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class JavaDriverFeatureStepSuite extends SparkFunSuite { - - test("Java Step modifies container correctly") { - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.jar")), - "test-class", - "java-runner", - Seq("5 7")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new JavaDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container - assert(driverContainerwithJavaStep.getArgs.size === 7) - val args = driverContainerwithJavaStep - .getArgs.asScala - assert(args === List( - "driver", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class", - "spark-internal", "5 7")) - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala deleted file mode 100644 index 2bcc6465b79d6..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class PythonDriverFeatureStepSuite extends SparkFunSuite { - - test("Python Step modifies container correctly") { - val expectedMainResource = "/main.py" - val mainResource = "local:///main.py" - val pyFiles = Seq("local:///example2.py", "local:///example3.py") - val expectedPySparkFiles = - "/example2.py:/example3.py" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) - .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(",")) - .set("spark.files", "local:///example.py") - .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.py")), - "test-app", - "python-runner", - Seq("5", "7", "9")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new PythonDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 4) - val envs = driverContainerwithPySpark - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource) - assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles) - assert(envs(ENV_PYSPARK_ARGS) === "5 7 9") - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "2") - } - test("Python Step testing empty pyfiles") { - val mainResource = "local:///main.py" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) - .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.py")), - "test-class-py", - "python-runner", - Seq.empty[String]), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - val step = new PythonDriverFeatureStep(kubernetesConf) - val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - val args = driverContainerwithPySpark - .getArgs.asScala - assert(driverContainerwithPySpark.getArgs.size === 5) - assert(args === List( - "driver-py", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class-py")) - val envs = driverContainerwithPySpark - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala deleted file mode 100644 index 17af6011a17d5..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.RMainAppResource - -class RDriverFeatureStepSuite extends SparkFunSuite { - - test("R Step modifies container correctly") { - val expectedMainResource = "/main.R" - val mainResource = "local:///main.R" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(RMainAppResource(mainResource)), - "test-app", - "r-runner", - Seq("5", "7", "9")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Seq.empty, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new RDriverFeatureStep(kubernetesConf) - val driverContainerwithR = step.configurePod(baseDriverPod).container - assert(driverContainerwithR.getEnv.size === 2) - val envs = driverContainerwithR - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_R_PRIMARY) === expectedMainResource) - assert(envs(ENV_R_ARGS) === "5 7 9") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index ae13df39b7a76..c3f5ad912b45d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -142,7 +142,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 051d7b6994f5d..9f828f644a547 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -28,9 +27,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val SERVICE_STEP_TYPE = "service" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" private val SECRETS_STEP_TYPE = "mount-secrets" - private val JAVA_STEP_TYPE = "java-bindings" - private val R_STEP_TYPE = "r-bindings" - private val PYSPARK_STEP_TYPE = "pyspark-bindings" + private val DRIVER_CMD_STEP_TYPE = "driver-command" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" @@ -50,14 +47,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) - private val javaStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - JAVA_STEP_TYPE, classOf[JavaDriverFeatureStep]) - - private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep]) - - private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - R_STEP_TYPE, classOf[RDriverFeatureStep]) + private val driverCommandStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + DRIVER_CMD_STEP_TYPE, classOf[DriverCommandFeatureStep]) private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) @@ -77,9 +68,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => envSecretsStep, _ => localDirsStep, _ => mountVolumesStep, - _ => pythonStep, - _ => rStep, - _ => javaStep, + _ => driverCommandStep, _ => hadoopGlobalStep) test("Apply fundamental steps all the time.") { @@ -98,7 +87,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -106,7 +94,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply secrets step if secrets are present.") { @@ -125,7 +113,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map("EnvName" -> "SecretName:secretKey"), Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -135,61 +122,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, ENV_SECRETS_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply Java step if main resource is none.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - None, - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply Python step if main resource is python.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("example.py")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - PYSPARK_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply volumes step if mounts are present.") { @@ -213,7 +146,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -222,34 +154,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply R step if main resource is R.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - Some(RMainAppResource("example.R")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - R_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") { @@ -268,7 +173,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = Some( HadoopConfSpec( Some("/var/hadoop-conf"), @@ -279,7 +183,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } @@ -299,7 +203,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = Some( HadoopConfSpec( None, @@ -310,7 +213,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index b572dac2bf624..144c548d27d76 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -73,7 +73,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) @@ -92,7 +91,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map("secret-name" -> "secret-key"), Map.empty, Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -120,7 +118,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -149,7 +146,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Some(HadoopConfSpec(Some("/var/hadoop-conf"), None))) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -177,7 +173,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName")))) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 4958b7363fee0..2b2a4e4cf6bcc 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -96,22 +96,6 @@ case "$SPARK_K8S_CMD" in "$@" ) ;; - driver-py) - CMD=( - "$SPARK_HOME/bin/spark-submit" - --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" - --deploy-mode client - "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS - ) - ;; - driver-r) - CMD=( - "$SPARK_HOME/bin/spark-submit" - --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" - --deploy-mode client - "$@" $R_PRIMARY $R_ARGS - ) - ;; executor) CMD=( ${JAVA_HOME}/bin/java From 91a47418d7ee8c4d564a79fae6ffa3bc384b09cc Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 30 Oct 2018 17:18:04 -0700 Subject: [PATCH 2/7] Tweaks after merge. --- .../spark/deploy/k8s/features/PodTemplateConfigMapStep.scala | 4 ++-- .../deploy/k8s/features/PodTemplateConfigMapStepSuite.scala | 1 - .../deploy/k8s/submit/KubernetesDriverBuilderSuite.scala | 5 +---- .../cluster/k8s/KubernetesExecutorBuilderSuite.scala | 1 - 4 files changed, 3 insertions(+), 8 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index 96a8013246b74..28e2d1726ae27 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -54,11 +54,11 @@ private[spark] class PodTemplateConfigMapStep( SparkPod(podWithVolume, containerWithVolume) } - def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( + override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) - def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index d7bbbd121af72..e7303b37f5b57 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -48,7 +48,6 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) templateFile = Files.createTempFile("pod-template", "yml").toFile templateFile.deleteOnExit() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 60c7c97f713f8..8710115e578f3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -187,7 +187,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -195,7 +194,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, TEMPLATE_VOLUME_STEP_TYPE) } @@ -296,7 +295,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val driverSpec = KubernetesDriverBuilder .apply(kubernetesClient, sparkConf) @@ -329,7 +327,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val exception = intercept[SparkException] { KubernetesDriverBuilder diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index c74a519aeb8f9..1fea08c37ccc6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -220,7 +220,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val sparkPod = KubernetesExecutorBuilder .apply(kubernetesClient, sparkConf) From 1a61f72e1fdad6d7e71e0d8deceed39f57b2cc01 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 1 Nov 2018 10:59:09 -0700 Subject: [PATCH 3/7] match -> if --- .../deploy/k8s/features/BasicExecutorFeatureStep.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index edb6f161290ad..98b52f230a47f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -58,13 +58,13 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - private val executorMemoryTotal = kubernetesConf.get(APP_RESOURCE_TYPE) match { - case Some("python") => + private val executorMemoryTotal = + if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some("python")) { executorMemoryWithOverhead + kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) - - case _ => executorMemoryWithOverhead - } + } else { + executorMemoryWithOverhead + } private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) private val executorCoresRequest = From 91765abc11ff3a0d44357ddb140fc494125fc1bb Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 1 Nov 2018 14:13:59 -0700 Subject: [PATCH 4/7] Make mainAppResource not optional. --- .../spark/deploy/k8s/KubernetesConf.scala | 10 +++++++--- .../k8s/features/BasicDriverFeatureStep.scala | 9 ++++----- .../k8s/features/DriverCommandFeatureStep.scala | 17 +++++++---------- .../submit/KubernetesClientApplication.scala | 10 +++++----- .../deploy/k8s/submit/MainAppResource.scala | 3 ++- .../spark/deploy/k8s/KubernetesConfSuite.scala | 5 ++--- .../features/BasicDriverFeatureStepSuite.scala | 16 +++++++--------- .../DriverCommandFeatureStepSuite.scala | 10 +++++----- .../DriverServiceFeatureStepSuite.scala | 13 +++++++------ .../features/LocalDirsFeatureStepSuite.scala | 3 ++- .../features/MountVolumesFeatureStepSuite.scala | 3 ++- .../PodTemplateConfigMapStepSuite.scala | 3 ++- .../spark/deploy/k8s/submit/ClientSuite.scala | 3 ++- .../submit/KubernetesDriverBuilderSuite.scala | 16 ++++++++-------- 14 files changed, 62 insertions(+), 59 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index c1a2ed6c72992..0846cb8c780b9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -38,11 +38,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf * Structure containing metadata for Kubernetes logic that builds a Spark driver. */ private[spark] case class KubernetesDriverSpecificConf( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appName: String, appArgs: Seq[String], - pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf + pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf { + + require(mainAppResource != null, "Main resource must be provided.") + +} /* * Structure containing metadata for Kubernetes logic that builds a Spark executor. @@ -115,7 +119,7 @@ private[spark] object KubernetesConf { appName: String, appResourceNamePrefix: String, appId: String, - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appArgs: Array[String], maybePyFiles: Option[String], diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 95d43cc5a12b3..5ddf73cb16a6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -51,17 +51,16 @@ private[spark] class BasicDriverFeatureStep( // The memory overhead factor to use. If the user has not set it, then use a different // value for non-JVM apps. This value is propagated to executors. - private val overheadFactor = conf.roleSpecificConf.mainAppResource match { - case Some(_: NonJVMResource) => + private val overheadFactor = + if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) { if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) { conf.get(MEMORY_OVERHEAD_FACTOR) } else { NON_JVM_MEMORY_OVERHEAD_FACTOR } - - case _ => + } else { conf.get(MEMORY_OVERHEAD_FACTOR) - } + } private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index 5d72fac62c7ee..81b86d2fadf12 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -39,30 +39,27 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri override def configurePod(pod: SparkPod): SparkPod = { driverConf.mainAppResource match { - case Some(JavaMainAppResource(_)) | None => + case JavaMainAppResource(_) => configureForJava(pod) - case Some(PythonMainAppResource(res)) => + case PythonMainAppResource(res) => configureForPython(pod, res) - case Some(RMainAppResource(res)) => + case RMainAppResource(res) => configureForR(pod, res) } } override def getAdditionalPodSystemProperties(): Map[String, String] = { driverConf.mainAppResource match { - case Some(JavaMainAppResource(res)) => - additionalJavaProperties(res) + case JavaMainAppResource(res) => + res.map(additionalJavaProperties).getOrElse(Map.empty) - case Some(PythonMainAppResource(res)) => + case PythonMainAppResource(res) => additionalPythonProperties(res) - case Some(RMainAppResource(res)) => + case RMainAppResource(res) => additionalRProperties(res) - - case None => - Map.empty } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 4b58f8ba3c9bd..543d6b16d6ae2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils * @param maybePyFiles additional Python files via --py-files */ private[spark] case class ClientArguments( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, driverArgs: Array[String], maybePyFiles: Option[String], @@ -53,18 +53,18 @@ private[spark] case class ClientArguments( private[spark] object ClientArguments { def fromCommandLineArgs(args: Array[String]): ClientArguments = { - var mainAppResource: Option[MainAppResource] = None + var mainAppResource: MainAppResource = JavaMainAppResource(None) var mainClass: Option[String] = None val driverArgs = mutable.ArrayBuffer.empty[String] var maybePyFiles : Option[String] = None args.sliding(2, 2).toList.foreach { case Array("--primary-java-resource", primaryJavaResource: String) => - mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + mainAppResource = JavaMainAppResource(Some(primaryJavaResource)) case Array("--primary-py-file", primaryPythonResource: String) => - mainAppResource = Some(PythonMainAppResource(primaryPythonResource)) + mainAppResource = PythonMainAppResource(primaryPythonResource) case Array("--primary-r-file", primaryRFile: String) => - mainAppResource = Some(RMainAppResource(primaryRFile)) + mainAppResource = RMainAppResource(primaryRFile) case Array("--other-py-files", pyFiles: String) => maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index dd5a4549743df..a2e01fa2d9a0e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -20,7 +20,8 @@ private[spark] sealed trait MainAppResource private[spark] sealed trait NonJVMResource -private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource +private[spark] case class JavaMainAppResource(primaryResource: Option[String]) + extends MainAppResource private[spark] case class PythonMainAppResource(primaryResource: String) extends MainAppResource with NonJVMResource diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index 9db98dd80dae7..41ca8d186c17b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -56,7 +56,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, @@ -65,7 +65,6 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) assert(conf.roleSpecificConf.appName === APP_NAME) - assert(conf.roleSpecificConf.mainAppResource.isEmpty) assert(conf.roleSpecificConf.mainClass === MAIN_CLASS) assert(conf.roleSpecificConf.appArgs === APP_ARGS) } @@ -94,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 0d52e21a359b3..1e7dfbeffdb24 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -52,7 +52,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { new LocalObjectReferenceBuilder().withName(secret).build() } private val emptyDriverSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), APP_NAME, MAIN_CLASS, APP_ARGS) @@ -144,7 +144,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val javaKubernetesConf = KubernetesConf( javaSparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("")), + JavaMainAppResource(None), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -161,7 +161,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { val pythonKubernetesConf = KubernetesConf( pythonSparkConf, KubernetesDriverSpecificConf( - Some(PythonMainAppResource("")), + PythonMainAppResource(""), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -220,12 +220,10 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { // Memory overhead tests. Tuples are: // test name, main resource, overhead factor, expected factor Seq( - ("java w/o resource", None, None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), - ("java w/ resource", Some(JavaMainAppResource(null)), None, - MEMORY_OVERHEAD_FACTOR.defaultValue.get), - ("python default", Some(PythonMainAppResource(null)), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), - ("python w/ override", Some(PythonMainAppResource(null)), Some(0.9d), 0.9d), - ("r default", Some(RMainAppResource(null)), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) + ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), + ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), + ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) ).foreach { case (name, resource, factor, expectedFactor) => test(s"memory overhead factor: $name") { // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala index 95ccbeb870a56..7513ca9c3b8b2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -43,7 +43,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { test("java resource") { val mainResource = "local:///main.jar" val spec = applyFeatureStep( - resource = Some(JavaMainAppResource(mainResource)), + resource = JavaMainAppResource(Some(mainResource)), appArgs = Array("5", "7")) assert(spec.pod.container.getArgs.asScala === List( "driver", @@ -62,7 +62,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { val spec = applyFeatureStep( conf = sparkConf, - resource = Some(PythonMainAppResource(mainResource))) + resource = PythonMainAppResource(mainResource)) assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, @@ -90,7 +90,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") val spec = applyFeatureStep( conf = sparkConf, - resource = Some(PythonMainAppResource(mainResource)), + resource = PythonMainAppResource(mainResource), appArgs = Array("5", "7", "9"), pyFiles = pyFiles) @@ -117,7 +117,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { val mainResource = s"local://$expectedMainResource" val spec = applyFeatureStep( - resource = Some(RMainAppResource(mainResource)), + resource = RMainAppResource(mainResource), appArgs = Array("5", "7", "9")) assert(spec.pod.container.getArgs.asScala === List( @@ -129,7 +129,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { private def applyFeatureStep( conf: SparkConf = new SparkConf(false), - resource: Option[MainAppResource] = None, + resource: MainAppResource = JavaMainAppResource(None), appArgs: Array[String] = Array(), pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { val driverConf = new KubernetesDriverSpecificConf( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 78f5c7df19bdb..3c46667c3042e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.Clock class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { @@ -59,7 +60,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -91,7 +92,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) .set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -113,7 +114,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -144,7 +145,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -172,7 +173,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -198,7 +199,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 2098ccaa5e246..894d824999aac 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -36,7 +37,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index a87ed44d3df34..2a957460ca8e0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.features import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class MountVolumesFeatureStepSuite extends SparkFunSuite { private val sparkConf = new SparkConf(false) private val emptyKubernetesConf = KubernetesConf( sparkConf = sparkConf, roleSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index e7303b37f5b57..370948c9502e4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { private var sparkConf: SparkConf = _ @@ -36,7 +37,7 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index c3f5ad912b45d..81e3822389f30 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -133,7 +134,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { sparkConf = new SparkConf(false) kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf]( sparkConf, - KubernetesDriverSpecificConf(None, MAIN_CLASS, APP_NAME, APP_ARGS), + KubernetesDriverSpecificConf(JavaMainAppResource(None), MAIN_CLASS, APP_NAME, APP_ARGS), KUBERNETES_RESOURCE_PREFIX, APP_ID, Map.empty, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 8710115e578f3..fe900fda6e545 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -86,7 +86,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -112,7 +112,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -145,7 +145,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -175,7 +175,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -202,7 +202,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -232,7 +232,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -283,7 +283,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val kubernetesConf = new KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -315,7 +315,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val kubernetesConf = new KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), From 5601d16fde34bb6b35d773466332b400905a8e30 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 1 Nov 2018 15:53:07 -0700 Subject: [PATCH 5/7] Remove obsolete test. --- .../DriverCommandFeatureStepSuite.scala | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala index 7513ca9c3b8b2..30672952aaf6f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -29,21 +29,10 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { private val MAIN_CLASS = "mainClass" - test("no resource defined runs java driver") { - val spec = applyFeatureStep(appArgs = Array("5 7")) - val container = spec.pod.container - assert(spec.pod.container.getArgs.asScala === List( - "driver", - "--properties-file", SPARK_CONF_PATH, - "--class", MAIN_CLASS, - "spark-internal", "5 7")) - assert(spec.systemProperties.isEmpty) - } - test("java resource") { val mainResource = "local:///main.jar" val spec = applyFeatureStep( - resource = JavaMainAppResource(Some(mainResource)), + JavaMainAppResource(Some(mainResource)), appArgs = Array("5", "7")) assert(spec.pod.container.getArgs.asScala === List( "driver", @@ -61,8 +50,8 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") val spec = applyFeatureStep( - conf = sparkConf, - resource = PythonMainAppResource(mainResource)) + PythonMainAppResource(mainResource), + conf = sparkConf) assert(spec.pod.container.getArgs.asScala === List( "driver", "--properties-file", SPARK_CONF_PATH, @@ -89,8 +78,8 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { .set("spark.files", filesInConf.mkString(",")) .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") val spec = applyFeatureStep( + PythonMainAppResource(mainResource), conf = sparkConf, - resource = PythonMainAppResource(mainResource), appArgs = Array("5", "7", "9"), pyFiles = pyFiles) @@ -117,7 +106,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { val mainResource = s"local://$expectedMainResource" val spec = applyFeatureStep( - resource = RMainAppResource(mainResource), + RMainAppResource(mainResource), appArgs = Array("5", "7", "9")) assert(spec.pod.container.getArgs.asScala === List( @@ -128,8 +117,8 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite { } private def applyFeatureStep( + resource: MainAppResource, conf: SparkConf = new SparkConf(false), - resource: MainAppResource = JavaMainAppResource(None), appArgs: Array[String] = Array(), pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { val driverConf = new KubernetesDriverSpecificConf( From 4152c76872f8c3145c1ac5038652ea8c4be13441 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 1 Nov 2018 18:32:18 -0700 Subject: [PATCH 6/7] Temporary debug change to see what's failing. --- .../apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index d8f3a6cec05c3..a8dc84a90df01 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -40,7 +40,8 @@ object ProcessUtils extends Logging { }) assert(proc.waitFor(timeout, TimeUnit.SECONDS), s"Timed out while executing ${fullCommand.mkString(" ")}") - assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") + assert(proc.exitValue == 0, + s"Failed to execute ${fullCommand.mkString(" ")}:\n${outputLines.mkString("\n")}") outputLines } } From 0d755d6549c46332ee7fa54c335d5c6c19727671 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 2 Nov 2018 10:52:24 -0700 Subject: [PATCH 7/7] Add constants for resource types. --- .../src/main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 ++ .../main/scala/org/apache/spark/deploy/k8s/Constants.scala | 5 +++++ .../deploy/k8s/features/BasicExecutorFeatureStep.scala | 2 +- .../deploy/k8s/features/DriverCommandFeatureStep.scala | 6 +++--- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index fd772962b45c9..a32bd93bb65bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s import java.util.concurrent.TimeUnit +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigBuilder @@ -239,6 +240,7 @@ private[spark] object Config extends Logging { .doc("This sets the resource type internally") .internal() .stringConf + .checkValues(Set(APP_RESOURCE_TYPE_JAVA, APP_RESOURCE_TYPE_PYTHON, APP_RESOURCE_TYPE_R)) .createOptional val KUBERNETES_LOCAL_DIRS_TMPFS = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 5dc3c3842071c..85917b88e912a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -110,4 +110,9 @@ private[spark] object Constants { // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" + + // Application resource types. + val APP_RESOURCE_TYPE_JAVA = "java" + val APP_RESOURCE_TYPE_PYTHON = "python" + val APP_RESOURCE_TYPE_R = "r" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 98b52f230a47f..7f397e6e84fa5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -59,7 +59,7 @@ private[spark] class BasicExecutorFeatureStep( MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB private val executorMemoryTotal = - if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some("python")) { + if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) { executorMemoryWithOverhead + kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala index 81b86d2fadf12..8b8f0d01d49f7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -111,16 +111,16 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDri } private def additionalJavaProperties(resource: String): Map[String, String] = { - resourceType("java") ++ mergeFileList("spark.jars", Seq(resource)) + resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource)) } private def additionalPythonProperties(resource: String): Map[String, String] = { - resourceType("python") ++ + resourceType(APP_RESOURCE_TYPE_PYTHON) ++ mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles) } private def additionalRProperties(resource: String): Map[String, String] = { - resourceType("r") ++ mergeFileList("spark.files", Seq(resource)) + resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource)) } private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = {