diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 1bebaf92501f4..e200dd211df6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File import java.util.Collections -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder, QuantityBuilder} import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ @@ -61,6 +61,11 @@ private[spark] class Client( .getOrElse(kubernetesAppId) private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + + // CPU settings + private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1") + + // Memory settings private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY) private val memoryOverheadMb = sparkConf .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) @@ -97,6 +102,15 @@ private[spark] class Client( .withValue(classPath) .build() } + val driverCpuQuantity = new QuantityBuilder(false) + .withAmount(driverCpuCores) + .build() + val driverMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${driverMemoryMb}M") + .build() + val driverMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(s"${driverContainerMemoryWithOverhead}M") + .build() val driverContainer = new ContainerBuilder() .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) @@ -114,6 +128,12 @@ private[spark] class Client( .withName(ENV_DRIVER_ARGS) .withValue(appArgs.mkString(" ")) .endEnv() + .withNewResources() + .addToRequests("cpu", driverCpuQuantity) + .addToLimits("cpu", driverCpuQuantity) + .addToRequests("memory", driverMemoryQuantity) + .addToLimits("memory", driverMemoryLimitQuantity) + .endResources() .build() val basePod = new PodBuilder() .withNewMetadata() @@ -261,7 +281,8 @@ private[spark] object Client { .getOrElse(Array.empty[String]) val appName = sparkConf.getOption("spark.app.name") .getOrElse("spark") - val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + val kubernetesAppId = sparkConf.getOption("spark.kubernetes.driver.pod.name") + .getOrElse(s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")) val namespace = sparkConf.get(KUBERNETES_NAMESPACE) val master = resolveK8sMaster(sparkConf.get("spark.master")) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)