From 8c2941cca1707d7b7f4fc38a223e768804d5173c Mon Sep 17 00:00:00 2001 From: duyanghao <1294057873@qq.com> Date: Thu, 8 Jun 2017 19:53:51 +0800 Subject: [PATCH 1/2] Support specify CPU cores and Memory restricts for driver Signed-off-by: duyanghao <1294057873@qq.com> --- .../deploy/kubernetes/submit/Client.scala | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 1bebaf92501f4..0544bf064844f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File import java.util.Collections -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder, QuantityBuilder} import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ @@ -61,6 +61,11 @@ private[spark] class Client( .getOrElse(kubernetesAppId) private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) + + // CPU settings + private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1") + + // Memory settings private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY) private val memoryOverheadMb = sparkConf .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) @@ -97,6 +102,15 @@ private[spark] class Client( .withValue(classPath) .build() } + val driverCpuQuantity = new QuantityBuilder(false) + .withAmount(driverCpuCores) + .build() + val driverMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${driverMemoryMb}M") + .build() + val driverMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(s"${driverContainerMemoryWithOverhead}M") + .build() val driverContainer = new ContainerBuilder() .withName(DRIVER_CONTAINER_NAME) .withImage(driverDockerImage) @@ -114,6 +128,12 @@ private[spark] class Client( .withName(ENV_DRIVER_ARGS) .withValue(appArgs.mkString(" ")) .endEnv() + .withNewResources() + .addToRequests("cpu", driverCpuQuantity) + .addToLimits("cpu", driverCpuQuantity) + .addToRequests("memory", driverMemoryQuantity) + .addToLimits("memory", driverMemoryLimitQuantity) + .endResources() .build() val basePod = new PodBuilder() .withNewMetadata() From abff943b90b4d40aa2d93df71625f82030f9eeba Mon Sep 17 00:00:00 2001 From: duyanghao <1294057873@qq.com> Date: Fri, 9 Jun 2017 10:46:26 +0800 Subject: [PATCH 2/2] Set driver and executor Label:spark-app-id = `--conf spark.kubernetes.driver.pod.name` Signed-off-by: duyanghao <1294057873@qq.com> --- .../org/apache/spark/deploy/kubernetes/submit/Client.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 0544bf064844f..e200dd211df6f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -281,7 +281,8 @@ private[spark] object Client { .getOrElse(Array.empty[String]) val appName = sparkConf.getOption("spark.app.name") .getOrElse("spark") - val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + val kubernetesAppId = sparkConf.getOption("spark.kubernetes.driver.pod.name") + .getOrElse(s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")) val namespace = sparkConf.get(KUBERNETES_NAMESPACE) val master = resolveK8sMaster(sparkConf.get("spark.master")) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)