diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 862f1d63ed39f..a32bd93bb65bc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s import java.util.concurrent.TimeUnit +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigBuilder @@ -125,34 +126,6 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_PYSPARK_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.python.mainAppResource") - .doc("The main app resource for pyspark jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_PYSPARK_APP_ARGS = - ConfigBuilder("spark.kubernetes.python.appArgs") - .doc("The app arguments for PySpark Jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_MAIN_APP_RESOURCE = - ConfigBuilder("spark.kubernetes.r.mainAppResource") - .doc("The main app resource for SparkR jobs") - .internal() - .stringConf - .createOptional - - val KUBERNETES_R_APP_ARGS = - ConfigBuilder("spark.kubernetes.r.appArgs") - .doc("The app arguments for SparkR Jobs") - .internal() - .stringConf - .createOptional - val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") @@ -267,6 +240,7 @@ private[spark] object Config extends Logging { .doc("This sets the resource type internally") .internal() .stringConf + .checkValues(Set(APP_RESOURCE_TYPE_JAVA, APP_RESOURCE_TYPE_PYTHON, APP_RESOURCE_TYPE_R)) .createOptional val KUBERNETES_LOCAL_DIRS_TMPFS = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 1c6d53c16871e..85917b88e912a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -69,12 +69,8 @@ private[spark] object Constants { val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" // BINDINGS - val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" val ENV_PYSPARK_FILES = "PYSPARK_FILES" - val ENV_PYSPARK_ARGS = "PYSPARK_APP_ARGS" val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION" - val ENV_R_PRIMARY = "R_PRIMARY" - val ENV_R_ARGS = "R_APP_ARGS" // Pod spec templates val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml" @@ -88,6 +84,7 @@ private[spark] object Constants { val DEFAULT_DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val MEMORY_OVERHEAD_MIN_MIB = 384L + val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d // Hadoop Configuration val HADOOP_FILE_VOLUME = "hadoop-properties" @@ -113,4 +110,9 @@ private[spark] object Constants { // Hadoop credentials secrets for the Spark app. val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" + + // Application resource types. + val APP_RESOURCE_TYPE_JAVA = "java" + val APP_RESOURCE_TYPE_PYTHON = "python" + val APP_RESOURCE_TYPE_R = "r" } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 066547dcbb408..ebb81540bbbbe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManag import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry +import org.apache.spark.util.Utils private[spark] sealed trait KubernetesRoleSpecificConf @@ -36,10 +37,15 @@ private[spark] sealed trait KubernetesRoleSpecificConf * Structure containing metadata for Kubernetes logic that builds a Spark driver. */ private[spark] case class KubernetesDriverSpecificConf( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appName: String, - appArgs: Seq[String]) extends KubernetesRoleSpecificConf + appArgs: Seq[String], + pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf { + + require(mainAppResource != null, "Main resource must be provided.") + +} /* * Structure containing metadata for Kubernetes logic that builds a Spark executor. @@ -70,7 +76,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( roleSecretEnvNamesToKeyRefs: Map[String, String], roleEnvs: Map[String, String], roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], - sparkFiles: Seq[String], hadoopConfSpec: Option[HadoopConfSpec]) { def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config" @@ -82,23 +87,6 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) - def sparkJars(): Seq[String] = sparkConf - .getOption("spark.jars") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) - - def pyFiles(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_PY_FILES) - - def pySparkMainResource(): Option[String] = sparkConf - .get(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE) - - def pySparkPythonVersion(): String = sparkConf - .get(PYSPARK_MAJOR_PYTHON_VERSION) - - def sparkRMainResource(): Option[String] = sparkConf - .get(KUBERNETES_R_MAIN_APP_RESOURCE) - def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) def imagePullSecrets(): Seq[LocalObjectReference] = { @@ -130,38 +118,11 @@ private[spark] object KubernetesConf { appName: String, appResourceNamePrefix: String, appId: String, - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, appArgs: Array[String], maybePyFiles: Option[String], hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { - val sparkConfWithMainAppJar = sparkConf.clone() - val additionalFiles = mutable.ArrayBuffer.empty[String] - mainAppResource.foreach { - case JavaMainAppResource(res) => - val previousJars = sparkConf - .getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty) - if (!previousJars.contains(res)) { - sparkConfWithMainAppJar.setJars(previousJars ++ Seq(res)) - } - // The function of this outer match is to account for multiple nonJVM - // bindings that will all have increased default MEMORY_OVERHEAD_FACTOR to 0.4 - case nonJVM: NonJVMResource => - nonJVM match { - case PythonMainAppResource(res) => - additionalFiles += res - maybePyFiles.foreach{maybePyFiles => - additionalFiles.appendAll(maybePyFiles.split(","))} - sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) - case RMainAppResource(res) => - additionalFiles += res - sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res) - } - sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) - } - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + @@ -188,11 +149,6 @@ private[spark] object KubernetesConf { KubernetesVolumeUtils.parseVolumesWithPrefix( sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get) - val sparkFiles = sparkConf - .getOption("spark.files") - .map(str => str.split(",").toSeq) - .getOrElse(Seq.empty[String]) ++ additionalFiles - val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP) KubernetesUtils.requireNandDefined( hadoopConfDir, @@ -205,10 +161,12 @@ private[spark] object KubernetesConf { } else { None } + val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil) + KubernetesConf( - sparkConfWithMainAppJar, - KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), + sparkConf.clone(), + KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles), appResourceNamePrefix, appId, driverLabels, @@ -217,7 +175,6 @@ private[spark] object KubernetesConf { driverSecretEnvNamesToKeyRefs, driverEnvs, driverVolumes, - sparkFiles, hadoopConfSpec) } @@ -274,7 +231,6 @@ private[spark] object KubernetesConf { executorEnvSecrets, executorEnv, executorVolumes, - Seq.empty[String], None) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 96b14a0d82b4c..5ddf73cb16a6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -28,6 +28,7 @@ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI +import org.apache.spark.util.Utils private[spark] class BasicDriverFeatureStep( conf: KubernetesConf[KubernetesDriverSpecificConf]) @@ -47,10 +48,23 @@ private[spark] class BasicDriverFeatureStep( // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + + // The memory overhead factor to use. If the user has not set it, then use a different + // value for non-JVM apps. This value is propagated to executors. + private val overheadFactor = + if (conf.roleSpecificConf.mainAppResource.isInstanceOf[NonJVMResource]) { + if (conf.sparkConf.contains(MEMORY_OVERHEAD_FACTOR)) { + conf.get(MEMORY_OVERHEAD_FACTOR) + } else { + NON_JVM_MEMORY_OVERHEAD_FACTOR + } + } else { + conf.get(MEMORY_OVERHEAD_FACTOR) + } + private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) - .getOrElse(math.max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB)) + .getOrElse(math.max((overheadFactor * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { @@ -134,20 +148,18 @@ private[spark] class BasicDriverFeatureStep( KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, - KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true") - - val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkJars()) - val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath( - conf.sparkFiles) - if (resolvedSparkJars.nonEmpty) { - additionalProps.put("spark.jars", resolvedSparkJars.mkString(",")) - } - if (resolvedSparkFiles.nonEmpty) { - additionalProps.put("spark.files", resolvedSparkFiles.mkString(",")) + KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true", + MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString) + + Seq("spark.jars", "spark.files").foreach { key => + conf.getOption(key).foreach { value => + val resolved = KubernetesUtils.resolveFileUrisAndPath(Utils.stringToSeq(value)) + if (resolved.nonEmpty) { + additionalProps.put(key, resolved.mkString(",")) + } + } } + additionalProps.toMap } - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 1dab2a834f3e7..7f397e6e84fa5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -58,16 +58,13 @@ private[spark] class BasicExecutorFeatureStep( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB - private val executorMemoryTotal = kubernetesConf.sparkConf - .getOption(APP_RESOURCE_TYPE.key).map{ res => - val additionalPySparkMemory = res match { - case "python" => - kubernetesConf.sparkConf - .get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) - case _ => 0 - } - executorMemoryWithOverhead + additionalPySparkMemory - }.getOrElse(executorMemoryWithOverhead) + private val executorMemoryTotal = + if (kubernetesConf.get(APP_RESOURCE_TYPE) == Some(APP_RESOURCE_TYPE_PYTHON)) { + executorMemoryWithOverhead + + kubernetesConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0) + } else { + executorMemoryWithOverhead + } private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) private val executorCoresRequest = @@ -187,8 +184,4 @@ private[spark] class BasicExecutorFeatureStep( SparkPod(executorPod, containerWithLimitCores) } - - override def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala new file mode 100644 index 0000000000000..8b8f0d01d49f7 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder} + +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.Utils + +/** + * Creates the driver command for running the user app, and propagates needed configuration so + * executors can also find the app code. + */ +private[spark] class DriverCommandFeatureStep(conf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { + + private val driverConf = conf.roleSpecificConf + + override def configurePod(pod: SparkPod): SparkPod = { + driverConf.mainAppResource match { + case JavaMainAppResource(_) => + configureForJava(pod) + + case PythonMainAppResource(res) => + configureForPython(pod, res) + + case RMainAppResource(res) => + configureForR(pod, res) + } + } + + override def getAdditionalPodSystemProperties(): Map[String, String] = { + driverConf.mainAppResource match { + case JavaMainAppResource(res) => + res.map(additionalJavaProperties).getOrElse(Map.empty) + + case PythonMainAppResource(res) => + additionalPythonProperties(res) + + case RMainAppResource(res) => + additionalRProperties(res) + } + } + + private def configureForJava(pod: SparkPod): SparkPod = { + // The user application jar is merged into the spark.jars list and managed through that + // property, so use a "blank" resource for the Java driver. + val driverContainer = baseDriverContainer(pod, SparkLauncher.NO_RESOURCE).build() + SparkPod(pod.pod, driverContainer) + } + + private def configureForPython(pod: SparkPod, res: String): SparkPod = { + val maybePythonFiles = if (driverConf.pyFiles.nonEmpty) { + // Delineation by ":" is to append the PySpark Files to the PYTHONPATH + // of the respective PySpark pod + val resolved = KubernetesUtils.resolveFileUrisAndPath(driverConf.pyFiles) + Some(new EnvVarBuilder() + .withName(ENV_PYSPARK_FILES) + .withValue(resolved.mkString(":")) + .build()) + } else { + None + } + val pythonEnvs = + Seq(new EnvVarBuilder() + .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) + .withValue(conf.sparkConf.get(PYSPARK_MAJOR_PYTHON_VERSION)) + .build()) ++ + maybePythonFiles + + val pythonContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)) + .addAllToEnv(pythonEnvs.asJava) + .build() + + SparkPod(pod.pod, pythonContainer) + } + + private def configureForR(pod: SparkPod, res: String): SparkPod = { + val rContainer = baseDriverContainer(pod, KubernetesUtils.resolveFileUri(res)).build() + SparkPod(pod.pod, rContainer) + } + + private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = { + new ContainerBuilder(pod.container) + .addToArgs("driver") + .addToArgs("--properties-file", SPARK_CONF_PATH) + .addToArgs("--class", driverConf.mainClass) + .addToArgs(resource) + .addToArgs(driverConf.appArgs: _*) + } + + private def additionalJavaProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_JAVA) ++ mergeFileList("spark.jars", Seq(resource)) + } + + private def additionalPythonProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_PYTHON) ++ + mergeFileList("spark.files", Seq(resource) ++ driverConf.pyFiles) + } + + private def additionalRProperties(resource: String): Map[String, String] = { + resourceType(APP_RESOURCE_TYPE_R) ++ mergeFileList("spark.files", Seq(resource)) + } + + private def mergeFileList(key: String, filesToAdd: Seq[String]): Map[String, String] = { + val existing = Utils.stringToSeq(conf.sparkConf.get(key, "")) + Map(key -> (existing ++ filesToAdd).distinct.mkString(",")) + } + + private def resourceType(resType: String): Map[String, String] = { + Map(APP_RESOURCE_TYPE.key -> resType) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala index 4c1be3bb13293..58cdaa3cadd6b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/KubernetesFeatureConfigStep.scala @@ -61,11 +61,11 @@ private[spark] trait KubernetesFeatureConfigStep { /** * Return any system properties that should be set on the JVM in accordance to this feature. */ - def getAdditionalPodSystemProperties(): Map[String, String] + def getAdditionalPodSystemProperties(): Map[String, String] = Map.empty /** * Return any additional Kubernetes resources that should be added to support this feature. Only * applicable when creating the driver in cluster mode. */ - def getAdditionalKubernetesResources(): Seq[HasMetadata] + def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala index 96a8013246b74..28e2d1726ae27 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStep.scala @@ -54,11 +54,11 @@ private[spark] class PodTemplateConfigMapStep( SparkPod(podWithVolume, containerWithVolume) } - def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( + override def getAdditionalPodSystemProperties(): Map[String, String] = Map[String, String]( KUBERNETES_EXECUTOR_PODTEMPLATE_FILE.key -> (EXECUTOR_POD_SPEC_TEMPLATE_MOUNTPATH + "/" + EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME)) - def getAdditionalKubernetesResources(): Seq[HasMetadata] = { + override def getAdditionalKubernetesResources(): Seq[HasMetadata] = { require(conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) val podTemplateFile = conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).get val podTemplateString = Files.toString(new File(podTemplateFile), StandardCharsets.UTF_8) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala deleted file mode 100644 index 6f063b253cd73..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStep.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants.SPARK_CONF_PATH -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep -import org.apache.spark.launcher.SparkLauncher - -private[spark] class JavaDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val withDriverArgs = new ContainerBuilder(pod.container) - .addToArgs("driver") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", kubernetesConf.roleSpecificConf.mainClass) - // The user application jar is merged into the spark.jars list and managed through that - // property, so there is no need to reference it explicitly here. - .addToArgs(SparkLauncher.NO_RESOURCE) - .addToArgs(kubernetesConf.roleSpecificConf.appArgs: _*) - .build() - SparkPod(pod.pod, withDriverArgs) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "java") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala deleted file mode 100644 index cf0c03b22bd7e..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStep.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class PythonDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "PySpark Main Resource must be defined") - // Delineation is done by " " because that is input into PythonRunner - val maybePythonArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - pyArgs => - new EnvVarBuilder() - .withName(ENV_PYSPARK_ARGS) - .withValue(pyArgs.mkString(" ")) - .build()) - val maybePythonFiles = kubernetesConf.pyFiles().map( - // Dilineation by ":" is to append the PySpark Files to the PYTHONPATH - // of the respective PySpark pod - pyFiles => - new EnvVarBuilder() - .withName(ENV_PYSPARK_FILES) - .withValue(KubernetesUtils.resolveFileUrisAndPath(pyFiles.split(",")) - .mkString(":")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.pySparkMainResource().get)) - .build(), - new EnvVarBuilder() - .withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION) - .withValue(kubernetesConf.pySparkPythonVersion()) - .build()) - val pythonEnvs = envSeq ++ - maybePythonArgs.toSeq ++ - maybePythonFiles.toSeq - - val withPythonPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(pythonEnvs.asJava) - .addToArgs("driver-py") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withPythonPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "python") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala deleted file mode 100644 index 1a7ef52fefe70..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStep.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, HasMetadata} - -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesUtils, SparkPod} -import org.apache.spark.deploy.k8s.Config.APP_RESOURCE_TYPE -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep - -private[spark] class RDriverFeatureStep( - kubernetesConf: KubernetesConf[KubernetesDriverSpecificConf]) - extends KubernetesFeatureConfigStep { - override def configurePod(pod: SparkPod): SparkPod = { - val roleConf = kubernetesConf.roleSpecificConf - require(roleConf.mainAppResource.isDefined, "R Main Resource must be defined") - // Delineation is done by " " because that is input into RRunner - val maybeRArgs = Option(roleConf.appArgs).filter(_.nonEmpty).map( - rArgs => - new EnvVarBuilder() - .withName(ENV_R_ARGS) - .withValue(rArgs.mkString(" ")) - .build()) - val envSeq = - Seq(new EnvVarBuilder() - .withName(ENV_R_PRIMARY) - .withValue(KubernetesUtils.resolveFileUri(kubernetesConf.sparkRMainResource().get)) - .build()) - val rEnvs = envSeq ++ - maybeRArgs.toSeq - - val withRPrimaryContainer = new ContainerBuilder(pod.container) - .addAllToEnv(rEnvs.asJava) - .addToArgs("driver-r") - .addToArgs("--properties-file", SPARK_CONF_PATH) - .addToArgs("--class", roleConf.mainClass) - .build() - - SparkPod(pod.pod, withRPrimaryContainer) - } - override def getAdditionalPodSystemProperties(): Map[String, String] = - Map(APP_RESOURCE_TYPE.key -> "r") - - override def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty -} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 4b58f8ba3c9bd..543d6b16d6ae2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -44,7 +44,7 @@ import org.apache.spark.util.Utils * @param maybePyFiles additional Python files via --py-files */ private[spark] case class ClientArguments( - mainAppResource: Option[MainAppResource], + mainAppResource: MainAppResource, mainClass: String, driverArgs: Array[String], maybePyFiles: Option[String], @@ -53,18 +53,18 @@ private[spark] case class ClientArguments( private[spark] object ClientArguments { def fromCommandLineArgs(args: Array[String]): ClientArguments = { - var mainAppResource: Option[MainAppResource] = None + var mainAppResource: MainAppResource = JavaMainAppResource(None) var mainClass: Option[String] = None val driverArgs = mutable.ArrayBuffer.empty[String] var maybePyFiles : Option[String] = None args.sliding(2, 2).toList.foreach { case Array("--primary-java-resource", primaryJavaResource: String) => - mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + mainAppResource = JavaMainAppResource(Some(primaryJavaResource)) case Array("--primary-py-file", primaryPythonResource: String) => - mainAppResource = Some(PythonMainAppResource(primaryPythonResource)) + mainAppResource = PythonMainAppResource(primaryPythonResource) case Array("--primary-r-file", primaryRFile: String) => - mainAppResource = Some(RMainAppResource(primaryRFile)) + mainAppResource = RMainAppResource(primaryRFile) case Array("--other-py-files", pyFiles: String) => maybePyFiles = Some(pyFiles) case Array("--main-class", clazz: String) => diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala index 5565cd74280e6..be4daec3b1bb9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{Config, KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkPod} import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} private[spark] class KubernetesDriverBuilder( provideBasicStep: (KubernetesConf[KubernetesDriverSpecificConf]) => BasicDriverFeatureStep = @@ -45,18 +44,10 @@ private[spark] class KubernetesDriverBuilder( provideVolumesStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf] => MountVolumesFeatureStep) = new MountVolumesFeatureStep(_), - providePythonStep: ( + provideDriverCommandStep: ( KubernetesConf[KubernetesDriverSpecificConf] - => PythonDriverFeatureStep) = - new PythonDriverFeatureStep(_), - provideRStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => RDriverFeatureStep) = - new RDriverFeatureStep(_), - provideJavaStep: ( - KubernetesConf[KubernetesDriverSpecificConf] - => JavaDriverFeatureStep) = - new JavaDriverFeatureStep(_), + => DriverCommandFeatureStep) = + new DriverCommandFeatureStep(_), provideHadoopGlobalStep: ( KubernetesConf[KubernetesDriverSpecificConf] => KerberosConfDriverFeatureStep) = @@ -88,21 +79,14 @@ private[spark] class KubernetesDriverBuilder( Seq(providePodTemplateConfigMapStep(kubernetesConf)) } else Nil - val bindingsStep = kubernetesConf.roleSpecificConf.mainAppResource.map { - case JavaMainAppResource(_) => - provideJavaStep(kubernetesConf) - case PythonMainAppResource(_) => - providePythonStep(kubernetesConf) - case RMainAppResource(_) => - provideRStep(kubernetesConf)} - .getOrElse(provideJavaStep(kubernetesConf)) + val driverCommandStep = provideDriverCommandStep(kubernetesConf) val maybeHadoopConfigStep = kubernetesConf.hadoopConfSpec.map { _ => provideHadoopGlobalStep(kubernetesConf)} val allFeatures: Seq[KubernetesFeatureConfigStep] = - (baseFeatures :+ bindingsStep) ++ + baseFeatures ++ Seq(driverCommandStep) ++ secretFeature ++ envSecretFeature ++ volumesFeature ++ maybeHadoopConfigStep.toSeq ++ podTemplateFeature diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala index dd5a4549743df..a2e01fa2d9a0e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MainAppResource.scala @@ -20,7 +20,8 @@ private[spark] sealed trait MainAppResource private[spark] sealed trait NonJVMResource -private[spark] case class JavaMainAppResource(primaryResource: String) extends MainAppResource +private[spark] case class JavaMainAppResource(primaryResource: Option[String]) + extends MainAppResource private[spark] case class PythonMainAppResource(primaryResource: String) extends MainAppResource with NonJVMResource diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala index bb2b94f9976e2..41ca8d186c17b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala @@ -56,7 +56,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, @@ -65,109 +65,10 @@ class KubernetesConfSuite extends SparkFunSuite { assert(conf.sparkConf.getAll.toMap === sparkConf.getAll.toMap) assert(conf.appResourceNamePrefix === RESOURCE_NAME_PREFIX) assert(conf.roleSpecificConf.appName === APP_NAME) - assert(conf.roleSpecificConf.mainAppResource.isEmpty) assert(conf.roleSpecificConf.mainClass === MAIN_CLASS) assert(conf.roleSpecificConf.appArgs === APP_ARGS) } - test("Creating driver conf with and without the main app jar influences spark.jars") { - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - val mainAppJar = Some(JavaMainAppResource("local:///opt/spark/main.jar")) - val kubernetesConfWithMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppJar, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainJar.sparkConf.get("spark.jars") - .split(",") - === Array("local:///opt/spark/jar1.jar", "local:///opt/spark/main.jar")) - val kubernetesConfWithoutMainJar = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource = None, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithoutMainJar.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithoutMainJar.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.1) - } - - test("Creating driver conf with a python primary file") { - val mainResourceFile = "local:///opt/spark/main.py" - val inputPyFiles = Array("local:///opt/spark/example2.py", "local:///example3.py") - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example4.py") - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - Some(inputPyFiles.mkString(",")), - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example4.py", mainResourceFile) ++ inputPyFiles) - } - - test("Creating driver conf with a r primary file") { - val mainResourceFile = "local:///opt/spark/main.R" - val sparkConf = new SparkConf(false) - .setJars(Seq("local:///opt/spark/jar1.jar")) - .set("spark.files", "local:///opt/spark/example2.R") - val mainAppResource = Some(RMainAppResource(mainResourceFile)) - val kubernetesConfWithMainResource = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(kubernetesConfWithMainResource.sparkConf.get("spark.jars").split(",") - === Array("local:///opt/spark/jar1.jar")) - assert(kubernetesConfWithMainResource.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.4) - assert(kubernetesConfWithMainResource.sparkFiles - === Array("local:///opt/spark/example2.R", mainResourceFile)) - } - - test("Testing explicit setting of memory overhead on non-JVM tasks") { - val sparkConf = new SparkConf(false) - .set(MEMORY_OVERHEAD_FACTOR, 0.3) - - val mainResourceFile = "local:///opt/spark/main.py" - val mainAppResource = Some(PythonMainAppResource(mainResourceFile)) - val conf = KubernetesConf.createDriverConf( - sparkConf, - APP_NAME, - RESOURCE_NAME_PREFIX, - APP_ID, - mainAppResource, - MAIN_CLASS, - APP_ARGS, - maybePyFiles = None, - hadoopConfDir = None) - assert(conf.sparkConf.get(MEMORY_OVERHEAD_FACTOR) === 0.3) - } - test("Resolve driver labels, annotations, secret mount paths, envs, and memory overhead") { val sparkConf = new SparkConf(false) .set(MEMORY_OVERHEAD_FACTOR, 0.3) @@ -192,7 +93,7 @@ class KubernetesConfSuite extends SparkFunSuite { APP_NAME, RESOURCE_NAME_PREFIX, APP_ID, - mainAppResource = None, + mainAppResource = JavaMainAppResource(None), MAIN_CLASS, APP_ARGS, maybePyFiles = None, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 5c6bcc72158be..1e7dfbeffdb24 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -24,8 +24,8 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.JavaMainAppResource -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -52,7 +52,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { new LocalObjectReferenceBuilder().withName(secret).build() } private val emptyDriverSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), APP_NAME, MAIN_CLASS, APP_ARGS) @@ -62,8 +62,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") .set("spark.driver.cores", "2") .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") - .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L) + .set(DRIVER_MEMORY.key, "256M") + .set(DRIVER_MEMORY_OVERHEAD, 200L) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS.mkString(",")) val kubernetesConf = KubernetesConf( @@ -77,7 +77,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val featureStep = new BasicDriverFeatureStep(kubernetesConf) @@ -130,21 +129,22 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, - "spark.kubernetes.submitInDriver" -> "true") + "spark.kubernetes.submitInDriver" -> "true", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } test("Check appropriate entrypoint rerouting for various bindings") { val javaSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver:latest") val pythonSparkConf = new SparkConf() - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "4g") + .set(DRIVER_MEMORY.key, "4g") .set(CONTAINER_IMAGE, "spark-driver-py:latest") val javaKubernetesConf = KubernetesConf( javaSparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("")), + JavaMainAppResource(None), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -156,13 +156,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val pythonKubernetesConf = KubernetesConf( pythonSparkConf, KubernetesDriverSpecificConf( - Some(PythonMainAppResource("")), + PythonMainAppResource(""), APP_NAME, PY_MAIN_CLASS, APP_ARGS), @@ -174,7 +173,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - Seq.empty[String], hadoopConfSpec = None) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) @@ -204,7 +202,6 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Map.empty, DRIVER_ENVS, Nil, - allFiles, hadoopConfSpec = None) val step = new BasicDriverFeatureStep(kubernetesConf) @@ -215,10 +212,52 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") + "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt", + MEMORY_OVERHEAD_FACTOR.key -> MEMORY_OVERHEAD_FACTOR.defaultValue.get.toString) assert(additionalProperties === expectedSparkConf) } + // Memory overhead tests. Tuples are: + // test name, main resource, overhead factor, expected factor + Seq( + ("java", JavaMainAppResource(None), None, MEMORY_OVERHEAD_FACTOR.defaultValue.get), + ("python default", PythonMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR), + ("python w/ override", PythonMainAppResource(null), Some(0.9d), 0.9d), + ("r default", RMainAppResource(null), None, NON_JVM_MEMORY_OVERHEAD_FACTOR) + ).foreach { case (name, resource, factor, expectedFactor) => + test(s"memory overhead factor: $name") { + // Choose a driver memory where the default memory overhead is > MEMORY_OVERHEAD_MIN_MIB + val driverMem = MEMORY_OVERHEAD_MIN_MIB / MEMORY_OVERHEAD_FACTOR.defaultValue.get * 2 + + // main app resource, overhead factor + val sparkConf = new SparkConf(false) + .set(CONTAINER_IMAGE, "spark-driver:latest") + .set(DRIVER_MEMORY.key, s"${driverMem.toInt}m") + factor.foreach { value => sparkConf.set(MEMORY_OVERHEAD_FACTOR, value) } + val driverConf = emptyDriverSpecificConf.copy(mainAppResource = resource) + val conf = KubernetesConf( + sparkConf, + driverConf, + RESOURCE_NAME_PREFIX, + APP_ID, + DRIVER_LABELS, + DRIVER_ANNOTATIONS, + Map.empty, + Map.empty, + DRIVER_ENVS, + Nil, + hadoopConfSpec = None) + val step = new BasicDriverFeatureStep(conf) + val pod = step.configurePod(SparkPod.initialPod()) + val mem = pod.container.getResources.getRequests.get("memory").getAmount() + val expected = (driverMem + driverMem * expectedFactor).toInt + assert(mem === s"${expected}Mi") + + val systemProperties = step.getAdditionalPodSystemProperties() + assert(systemProperties(MEMORY_OVERHEAD_FACTOR.key) === expectedFactor.toString) + } + } + def containerPort(name: String, portNumber: Int): ContainerPort = new ContainerPortBuilder() .withName(name) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 41f34bd45cd5b..e9a16aab6ccc2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -91,7 +91,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -132,7 +131,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(step.configurePod(SparkPod.initialPod()).pod.getSpec.getHostname.length === 63) } @@ -154,7 +152,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map("qux" -> "quux"), Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) @@ -182,7 +179,6 @@ class BasicExecutorFeatureStepSuite Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala new file mode 100644 index 0000000000000..30672952aaf6f --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.util.Utils + +class DriverCommandFeatureStepSuite extends SparkFunSuite { + + private val MAIN_CLASS = "mainClass" + + test("java resource") { + val mainResource = "local:///main.jar" + val spec = applyFeatureStep( + JavaMainAppResource(Some(mainResource)), + appArgs = Array("5", "7")) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "spark-internal", "5", "7")) + + val jars = Utils.stringToSeq(spec.systemProperties("spark.jars")) + assert(jars.toSet === Set(mainResource)) + } + + test("python resource with no extra files") { + val mainResource = "local:///main.py" + val sparkConf = new SparkConf(false) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") + + val spec = applyFeatureStep( + PythonMainAppResource(mainResource), + conf = sparkConf) + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py")) + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === Set(mainResource)) + } + + test("python resource with extra files") { + val expectedMainResource = "/main.py" + val expectedPySparkFiles = "/example2.py:/example3.py" + val filesInConf = Set("local:///example.py") + + val mainResource = s"local://$expectedMainResource" + val pyFiles = Seq("local:///example2.py", "local:///example3.py") + + val sparkConf = new SparkConf(false) + .set("spark.files", filesInConf.mkString(",")) + .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") + val spec = applyFeatureStep( + PythonMainAppResource(mainResource), + conf = sparkConf, + appArgs = Array("5", "7", "9"), + pyFiles = pyFiles) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.py", "5", "7", "9")) + + val envs = spec.pod.container.getEnv.asScala + .map { env => (env.getName, env.getValue) } + .toMap + val expected = Map( + ENV_PYSPARK_FILES -> expectedPySparkFiles, + ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "2") + assert(envs === expected) + + val files = Utils.stringToSeq(spec.systemProperties("spark.files")) + assert(files.toSet === pyFiles.toSet ++ filesInConf ++ Set(mainResource)) + } + + test("R resource") { + val expectedMainResource = "/main.R" + val mainResource = s"local://$expectedMainResource" + + val spec = applyFeatureStep( + RMainAppResource(mainResource), + appArgs = Array("5", "7", "9")) + + assert(spec.pod.container.getArgs.asScala === List( + "driver", + "--properties-file", SPARK_CONF_PATH, + "--class", MAIN_CLASS, + "/main.R", "5", "7", "9")) + } + + private def applyFeatureStep( + resource: MainAppResource, + conf: SparkConf = new SparkConf(false), + appArgs: Array[String] = Array(), + pyFiles: Seq[String] = Nil): KubernetesDriverSpec = { + val driverConf = new KubernetesDriverSpecificConf( + resource, MAIN_CLASS, "appName", appArgs, pyFiles = pyFiles) + val kubernetesConf = KubernetesConf( + conf, + driverConf, + "resource-prefix", + "appId", + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Map.empty, + Nil, + hadoopConfSpec = None) + val step = new DriverCommandFeatureStep(kubernetesConf) + val pod = step.configurePod(SparkPod.initialPod()) + val props = step.getAdditionalPodSystemProperties() + KubernetesDriverSpec(pod, Nil, props) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index 8675ceb48cf6d..36c6616a87b0a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -62,7 +62,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert(kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) @@ -95,7 +94,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) @@ -135,7 +133,6 @@ class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with Bef Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 5c3e801501513..3c46667c3042e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource import org.apache.spark.util.Clock class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { @@ -59,7 +60,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -68,7 +69,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) assert(configurationStep.configurePod(SparkPod.initialPod()) === SparkPod.initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) @@ -92,7 +92,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .set(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT, 8080) .set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -101,7 +101,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX @@ -115,7 +114,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), SHORT_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -124,7 +123,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None)) val resolvedService = configurationStep .getAdditionalKubernetesResources() @@ -147,7 +145,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(KUBERNETES_NAMESPACE, "my-namespace"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -156,7 +154,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) val driverService = configurationStep @@ -176,7 +173,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf.set(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS, "host"), KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -185,7 +182,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver bind address should not be allowed.") @@ -203,7 +199,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, "main", "app", Seq.empty), + JavaMainAppResource(None), "main", "app", Seq.empty), LONG_RESOURCE_NAME_PREFIX, "app-id", DRIVER_LABELS, @@ -212,7 +208,6 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None), clock) fail("The driver host address should not be allowed.") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index 43796b77efdc7..3d253079c3ce7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -46,7 +46,6 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite{ envVarsToKeys, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new EnvSecretsFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index 3a4e60547d7f2..894d824999aac 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" @@ -36,7 +37,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -48,7 +49,6 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index 18e3d773f690d..1555f6a9c6527 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -44,7 +44,6 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) val step = new MountSecretsFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index 0d0a5fb951f64..2a957460ca8e0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.deploy.k8s.features import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class MountVolumesFeatureStepSuite extends SparkFunSuite { private val sparkConf = new SparkConf(false) private val emptyKubernetesConf = KubernetesConf( sparkConf = sparkConf, roleSpecificConf = KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -36,7 +37,6 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { roleSecretEnvNamesToKeyRefs = Map.empty, roleEnvs = Map.empty, roleVolumes = Nil, - sparkFiles = Nil, hadoopConfSpec = None) test("Mounts hostPath volumes") { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala index d7bbbd121af72..370948c9502e4 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/PodTemplateConfigMapStepSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { private var sparkConf: SparkConf = _ @@ -36,7 +37,7 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { kubernetesConf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "app-name", "main", Seq.empty), @@ -48,7 +49,6 @@ class PodTemplateConfigMapStepSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) templateFile = Files.createTempFile("pod-template", "yml").toFile templateFile.deleteOnExit() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala deleted file mode 100644 index 9172e0c3dc408..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class JavaDriverFeatureStepSuite extends SparkFunSuite { - - test("Java Step modifies container correctly") { - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.jar")), - "test-class", - "java-runner", - Seq("5 7")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new JavaDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithJavaStep = step.configurePod(baseDriverPod).container - assert(driverContainerwithJavaStep.getArgs.size === 7) - val args = driverContainerwithJavaStep - .getArgs.asScala - assert(args === List( - "driver", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class", - "spark-internal", "5 7")) - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala deleted file mode 100644 index 2bcc6465b79d6..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.PythonMainAppResource - -class PythonDriverFeatureStepSuite extends SparkFunSuite { - - test("Python Step modifies container correctly") { - val expectedMainResource = "/main.py" - val mainResource = "local:///main.py" - val pyFiles = Seq("local:///example2.py", "local:///example3.py") - val expectedPySparkFiles = - "/example2.py:/example3.py" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) - .set(KUBERNETES_PYSPARK_PY_FILES, pyFiles.mkString(",")) - .set("spark.files", "local:///example.py") - .set(PYSPARK_MAJOR_PYTHON_VERSION, "2") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.py")), - "test-app", - "python-runner", - Seq("5", "7", "9")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new PythonDriverFeatureStep(kubernetesConf) - val driverPod = step.configurePod(baseDriverPod).pod - val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - assert(driverContainerwithPySpark.getEnv.size === 4) - val envs = driverContainerwithPySpark - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_PYSPARK_PRIMARY) === expectedMainResource) - assert(envs(ENV_PYSPARK_FILES) === expectedPySparkFiles) - assert(envs(ENV_PYSPARK_ARGS) === "5 7 9") - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "2") - } - test("Python Step testing empty pyfiles") { - val mainResource = "local:///main.py" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, mainResource) - .set(PYSPARK_MAJOR_PYTHON_VERSION, "3") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("local:///main.py")), - "test-class-py", - "python-runner", - Seq.empty[String]), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Nil, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - val step = new PythonDriverFeatureStep(kubernetesConf) - val driverContainerwithPySpark = step.configurePod(baseDriverPod).container - val args = driverContainerwithPySpark - .getArgs.asScala - assert(driverContainerwithPySpark.getArgs.size === 5) - assert(args === List( - "driver-py", - "--properties-file", SPARK_CONF_PATH, - "--class", "test-class-py")) - val envs = driverContainerwithPySpark - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_PYSPARK_MAJOR_PYTHON_VERSION) === "3") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala deleted file mode 100644 index 17af6011a17d5..0000000000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.features.bindings - -import scala.collection.JavaConverters._ - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.RMainAppResource - -class RDriverFeatureStepSuite extends SparkFunSuite { - - test("R Step modifies container correctly") { - val expectedMainResource = "/main.R" - val mainResource = "local:///main.R" - val baseDriverPod = SparkPod.initialPod() - val sparkConf = new SparkConf(false) - .set(KUBERNETES_R_MAIN_APP_RESOURCE, mainResource) - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - Some(RMainAppResource(mainResource)), - "test-app", - "r-runner", - Seq("5", "7", "9")), - appResourceNamePrefix = "", - appId = "", - roleLabels = Map.empty, - roleAnnotations = Map.empty, - roleSecretNamesToMountPaths = Map.empty, - roleSecretEnvNamesToKeyRefs = Map.empty, - roleEnvs = Map.empty, - roleVolumes = Seq.empty, - sparkFiles = Seq.empty[String], - hadoopConfSpec = None) - - val step = new RDriverFeatureStep(kubernetesConf) - val driverContainerwithR = step.configurePod(baseDriverPod).container - assert(driverContainerwithR.getEnv.size === 2) - val envs = driverContainerwithR - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_R_PRIMARY) === expectedMainResource) - assert(envs(ENV_R_ARGS) === "5 7 9") - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index ae13df39b7a76..81e3822389f30 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -133,7 +134,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { sparkConf = new SparkConf(false) kubernetesConf = KubernetesConf[KubernetesDriverSpecificConf]( sparkConf, - KubernetesDriverSpecificConf(None, MAIN_CLASS, APP_NAME, APP_ARGS), + KubernetesDriverSpecificConf(JavaMainAppResource(None), MAIN_CLASS, APP_NAME, APP_ARGS), KUBERNETES_RESOURCE_PREFIX, APP_ID, Map.empty, @@ -142,7 +143,6 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) when(driverBuilder.buildFromFeatures(kubernetesConf)).thenReturn(BUILT_KUBERNETES_SPEC) when(kubernetesClient.pods()).thenReturn(podOperations) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 84968c3523fc0..fe900fda6e545 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -24,7 +24,6 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config.{CONTAINER_IMAGE, KUBERNETES_DRIVER_PODTEMPLATE_FILE, KUBERNETES_EXECUTOR_PODTEMPLATE_FILE} import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.deploy.k8s.features.bindings.{JavaDriverFeatureStep, PythonDriverFeatureStep, RDriverFeatureStep} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -33,9 +32,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val SERVICE_STEP_TYPE = "service" private val LOCAL_DIRS_STEP_TYPE = "local-dirs" private val SECRETS_STEP_TYPE = "mount-secrets" - private val JAVA_STEP_TYPE = "java-bindings" - private val R_STEP_TYPE = "r-bindings" - private val PYSPARK_STEP_TYPE = "pyspark-bindings" + private val DRIVER_CMD_STEP_TYPE = "driver-command" private val ENV_SECRETS_STEP_TYPE = "env-secrets" private val HADOOP_GLOBAL_STEP_TYPE = "hadoop-global" private val MOUNT_VOLUMES_STEP_TYPE = "mount-volumes" @@ -56,14 +53,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, classOf[MountSecretsFeatureStep]) - private val javaStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - JAVA_STEP_TYPE, classOf[JavaDriverFeatureStep]) - - private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - PYSPARK_STEP_TYPE, classOf[PythonDriverFeatureStep]) - - private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( - R_STEP_TYPE, classOf[RDriverFeatureStep]) + private val driverCommandStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( + DRIVER_CMD_STEP_TYPE, classOf[DriverCommandFeatureStep]) private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, classOf[EnvSecretsFeatureStep]) @@ -87,9 +78,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => envSecretsStep, _ => localDirsStep, _ => mountVolumesStep, - _ => pythonStep, - _ => rStep, - _ => javaStep, + _ => driverCommandStep, _ => hadoopGlobalStep, _ => templateVolumeStep) @@ -97,7 +86,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -109,7 +98,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -117,14 +105,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply secrets step if secrets are present.") { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -136,7 +124,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map("EnvName" -> "SecretName:secretKey"), Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -146,61 +133,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, ENV_SECRETS_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply Java step if main resource is none.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - None, - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply Python step if main resource is python.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - Some(PythonMainAppResource("example.py")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - PYSPARK_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply volumes step if mounts are present.") { @@ -212,7 +145,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -224,7 +157,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String], hadoopConfSpec = None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -233,34 +165,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - JAVA_STEP_TYPE) - } - - test("Apply R step if main resource is R.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - Some(RMainAppResource("example.R")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Seq.empty[String], - hadoopConfSpec = None) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - R_STEP_TYPE) + DRIVER_CMD_STEP_TYPE) } test("Apply template volume step if executor template is present.") { @@ -270,7 +175,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -282,7 +187,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -290,7 +194,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, TEMPLATE_VOLUME_STEP_TYPE) } @@ -298,7 +202,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -310,7 +214,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = Some( HadoopConfSpec( Some("/var/hadoop-conf"), @@ -321,7 +224,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } @@ -329,7 +232,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf( - None, + JavaMainAppResource(None), "test-app", "main", Seq.empty), @@ -341,7 +244,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], hadoopConfSpec = Some( HadoopConfSpec( None, @@ -352,7 +254,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE, + DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } @@ -381,7 +283,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val kubernetesConf = new KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -393,7 +295,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val driverSpec = KubernetesDriverBuilder .apply(kubernetesClient, sparkConf) @@ -414,7 +315,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val kubernetesConf = new KubernetesConf( sparkConf, KubernetesDriverSpecificConf( - Some(JavaMainAppResource("example.jar")), + JavaMainAppResource(Some("example.jar")), "test-app", "main", Seq.empty), @@ -426,7 +327,6 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val exception = intercept[SparkException] { KubernetesDriverBuilder diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index fb2509fc1bda5..1fea08c37ccc6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -76,7 +76,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) @@ -95,7 +94,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map("secret-name" -> "secret-key"), Map.empty, Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -123,7 +121,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, volumeSpec :: Nil, - Seq.empty[String], None) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -152,7 +149,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Some(HadoopConfSpec(Some("/var/hadoop-conf"), None))) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -180,7 +176,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName")))) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), @@ -225,7 +220,6 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Map.empty, Map.empty, Nil, - Seq.empty[String], Option.empty) val sparkPod = KubernetesExecutorBuilder .apply(kubernetesClient, sparkConf) diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 4958b7363fee0..2b2a4e4cf6bcc 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -96,22 +96,6 @@ case "$SPARK_K8S_CMD" in "$@" ) ;; - driver-py) - CMD=( - "$SPARK_HOME/bin/spark-submit" - --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" - --deploy-mode client - "$@" $PYSPARK_PRIMARY $PYSPARK_ARGS - ) - ;; - driver-r) - CMD=( - "$SPARK_HOME/bin/spark-submit" - --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS" - --deploy-mode client - "$@" $R_PRIMARY $R_ARGS - ) - ;; executor) CMD=( ${JAVA_HOME}/bin/java