diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 52d847b4420cf..3a50860f826c5 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -718,6 +718,20 @@ from the other deployment modes. See the [configuration page](configuration.html
Docker image pull policy used when pulling Docker images with Kubernetes.
+
+ spark.kubernetes.driver.limit.cores |
+ (none) |
+
+ Specify the hard cpu limit for the driver pod
+ |
+
+
+ spark.kubernetes.executor.limit.cores |
+ (none) |
+
+ Specify the hard cpu limit for a single executor pod
+ |
+
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index 70ea19e44ef8c..e1c1ab9d459fc 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -485,6 +485,18 @@ package object config extends Logging {
.stringConf
.createOptional
+ private[spark] val KUBERNETES_DRIVER_LIMIT_CORES =
+ ConfigBuilder("spark.kubernetes.driver.limit.cores")
+ .doc("Specify the hard cpu limit for the driver pod")
+ .stringConf
+ .createOptional
+
+ private[spark] val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ ConfigBuilder("spark.kubernetes.executor.limit.cores")
+ .doc("Specify the hard cpu limit for a single executor pod")
+ .stringConf
+ .createOptional
+
private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala
index ac3a51e74f838..8220127eac449 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala
@@ -64,6 +64,7 @@ private[spark] class Client(
// CPU settings
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
+ private val driverLimitCores = sparkConf.getOption(KUBERNETES_DRIVER_LIMIT_CORES.key)
// Memory settings
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
@@ -139,7 +140,6 @@ private[spark] class Client(
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
- .addToLimits("cpu", driverCpuQuantity)
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.endResources()
@@ -156,6 +156,21 @@ private[spark] class Client(
.addToContainers(driverContainer)
.endSpec()
+ driverLimitCores.map {
+ limitCores =>
+ val driverCpuLimitQuantity = new QuantityBuilder(false)
+ .withAmount(limitCores)
+ .build()
+ basePod
+ .editSpec()
+ .editFirstContainer()
+ .editResources
+ .addToLimits("cpu", driverCpuLimitQuantity)
+ .endResources()
+ .endContainer()
+ .endSpec()
+ }
+
val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(allDriverLabels)
.map { uploader =>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
index 4165eb8cbd067..31cf929b94e8b 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala
@@ -108,6 +108,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb
private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")
+ private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key)
private implicit val requestExecutorContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests"))
@@ -438,7 +439,6 @@ private[spark] class KubernetesClusterSchedulerBackend(
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryLimitQuantity)
.addToRequests("cpu", executorCpuQuantity)
- .addToLimits("cpu", executorCpuQuantity)
.endResources()
.addAllToEnv(requiredEnv.asJava)
.addToEnv(executorExtraClasspathEnv.toSeq: _*)
@@ -446,6 +446,21 @@ private[spark] class KubernetesClusterSchedulerBackend(
.endContainer()
.endSpec()
+ executorLimitCores.map {
+ limitCores =>
+ val executorCpuLimitQuantity = new QuantityBuilder(false)
+ .withAmount(limitCores)
+ .build()
+ basePodBuilder
+ .editSpec()
+ .editFirstContainer()
+ .editResources
+ .addToLimits("cpu", executorCpuLimitQuantity)
+ .endResources()
+ .endContainer()
+ .endSpec()
+ }
+
val withMaybeShuffleConfigPodBuilder = shuffleServiceConfig
.map { config =>
config.shuffleDirs.foldLeft(basePodBuilder) { (builder, dir) =>