From 42c3a6c73152ccfe6a8725c28a4a2f9f0e20a9a9 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Thu, 29 Jun 2017 10:13:19 -0700 Subject: [PATCH] Allow number of executor cores to have fractional values This commit tries to solve issue #359 by allowing the `spark.executor.cores` configuration key to take fractional values, e.g., 0.5 or 1.5. The value is used to specify the cpu request when creating the executor pods, which is allowed to be fractional by Kubernetes. When the value is passed to the executor process through the environment variable `SPARK_EXECUTOR_CORES`, the value is rounded up to the closest integer as required by the `CoarseGrainedExecutorBackend`. Signed-off-by: Yinan Li --- .../kubernetes/KubernetesClusterSchedulerBackend.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 31cf929b94e8b..d880cee315c0d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -107,7 +107,7 @@ private[spark] class KubernetesClusterSchedulerBackend( MEMORY_OVERHEAD_MIN)) private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb - private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") + private val executorCores = conf.getDouble("spark.executor.cores", 1d) private val executorLimitCores = conf.getOption(KUBERNETES_EXECUTOR_LIMIT_CORES.key) private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( @@ -377,7 +377,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .withAmount(s"${executorMemoryWithOverhead}M") .build() val executorCpuQuantity = new QuantityBuilder(false) - .withAmount(executorCores) + .withAmount(executorCores.toString) .build() val executorExtraClasspathEnv = executorExtraClasspath.map { cp => new EnvVarBuilder() @@ -388,7 +388,8 @@ private[spark] class KubernetesClusterSchedulerBackend( val requiredEnv = Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), - (ENV_EXECUTOR_CORES, executorCores), + // Executor backend expects integral value for executor cores, so round it up to an int. + (ENV_EXECUTOR_CORES, math.ceil(executorCores).toInt.toString), (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), (ENV_EXECUTOR_ID, executorId),