From 6c7fb165f0ee53559bad158d88a6e04d6c86e43d Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 28 Feb 2017 12:16:15 -0800 Subject: [PATCH 1/3] Allow setting memory on the driver submission server. --- docs/running-on-kubernetes.md | 16 +++++++++++ .../spark/deploy/kubernetes/Client.scala | 28 +++++++++++++++++++ .../spark/deploy/kubernetes/config.scala | 20 +++++++++++++ .../spark/deploy/kubernetes/constants.scala | 3 ++ .../KubernetesClusterSchedulerBackend.scala | 4 +-- 5 files changed, 68 insertions(+), 3 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 9d49ac6829723..8f3866e0de1ba 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -178,6 +178,22 @@ from the other deployment modes. See the [configuration page](configuration.html (typically 6-10%). + + spark.kubernetes.driver.submitServerMemory + 256m + + The amount of memory to allocate for the driver submission server. + + + + spark.kubernetes.driver.memoryOverhead + (driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384 + + The amount of off-heap memory (in megabytes) to be allocated for the driver. This is memory that accounts for things + like VM overheads, interned strings, other native overheads, etc. This tends to grow with the driver size + (typically 6-10%). + + spark.kubernetes.driver.labels (none) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c787d5917e381..fc7ae954d00f2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -63,6 +63,19 @@ private[spark] class Client( .map(_.split(",")) .getOrElse(Array.empty[String]) + // Memory settings + private val driverMemory = sparkConf.get("spark.driver.memory", "1g") + private val driverMemoryBytes = Utils.byteStringAsBytes(driverMemory) + private val driverSubmitServerMemory = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY) + private val driverSubmitServerMemoryBytes = Utils.byteStringAsBytes(driverSubmitServerMemoryBytes) + private val driverContainerMemoryBytes = driverMemoryBytes + driverSubmitServerMemoryBytes + private val memoryOverheadBytes = sparkConf + .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) + .map(overhead => Utils.byteStringAsBytes(overhead)) + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryBytes).toInt, + MEMORY_OVERHEAD_MIN)) + private val driverContainerMemoryWithOverhead = driverContainerMemoryBytes + memoryOverheadBytes + private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) private val secretBase64String = { @@ -373,6 +386,12 @@ private[spark] class Client( .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() + val driverMemoryQuantity = new QuantityBuilder(false) + .withAmount(driverContainerMemoryBytes.toString) + .build() + val driverMemoryLimitQuantity = new QuantityBuilder(false) + .withAmount(driverContainerMemoryWithOverhead.toString) + .build() kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) @@ -406,7 +425,16 @@ private[spark] class Client( .withName(ENV_SUBMISSION_SERVER_PORT) .withValue(SUBMISSION_SERVER_PORT.toString) .endEnv() + // Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class. + .addNewEnv() + .withName(ENV_DRIVER_MEMORY) + .withValue(driverSubmitServerMemory) + .endEnv() .addToEnv(sslConfiguration.sslPodEnvVars: _*) + .withNewResources() + .addToRequests("memory", driverMemoryQuantity) + .addToLimits("memory", driverMemoryLimitQuantity) + .endResources() .withPorts(containerPorts.asJava) .withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe() .endContainer() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index a21ec2101cc6e..491064c5b214c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -106,6 +106,18 @@ package object config { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = + ConfigBuilder("spark.kubernetes.driver.memoryOverhead") + .doc(""" + | The amount of off-heap memory (in megabytes) to be + | allocated for the driver. This is memory that accounts for + | things like VM overheads, interned strings, other native + | overheads, etc. This tends to grow with the driver's memory + | size (typically 6-10%). + """.stripMargin) + .stringConf + .createOptional + private[spark] val KUBERNETES_DRIVER_LABELS = ConfigBuilder("spark.kubernetes.driver.labels") .doc(""" @@ -156,6 +168,14 @@ package object config { .stringConf .createOptional + private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY = + ConfigBuilder("spark.kubernetes.driver.submitServerMemory") + .doc(""" + | The amount of memory to allocate for the driver submission server. + """.stripMargin) + .stringConf + .createWithDefault("256m") + private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") .doc(""" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 688cd858e79ff..f3c383d431cef 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -63,9 +63,12 @@ package object constants { private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY" // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc" + private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10 + private[spark] val MEMORY_OVERHEAD_MIN = 384L } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 898b215b92d04..f2cf325b16eda 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,7 +60,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .getOrElse( throw new SparkException("Must specify the driver pod name")) - private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g") + private val executorMemory = conf.get("spark.executor.memory", "1g") private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) private val memoryOverheadBytes = conf @@ -261,7 +261,5 @@ private[spark] class KubernetesClusterSchedulerBackend( private object KubernetesClusterSchedulerBackend { private val DEFAULT_STATIC_PORT = 10000 - private val MEMORY_OVERHEAD_FACTOR = 0.10 - private val MEMORY_OVERHEAD_MIN = 384L private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) } From 3c4aff23af62b3fb1aa1c39fbb9b274524c7f568 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 28 Feb 2017 14:56:58 -0800 Subject: [PATCH 2/3] Address comments --- docs/running-on-kubernetes.md | 12 ++++++------ .../org/apache/spark/deploy/kubernetes/config.scala | 10 +++++----- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 8f3866e0de1ba..73658474f918d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -171,7 +171,7 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.executor.memoryOverhead - executorMemory * 0.10, with minimum of 384 + executorMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size @@ -179,7 +179,7 @@ from the other deployment modes. See the [configuration page](configuration.html - spark.kubernetes.driver.submitServerMemory + spark.kubernetes.driver.submissionServerMemory 256m The amount of memory to allocate for the driver submission server. @@ -187,11 +187,11 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.driver.memoryOverhead - (driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384 + (driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384 - The amount of off-heap memory (in megabytes) to be allocated for the driver. This is memory that accounts for things - like VM overheads, interned strings, other native overheads, etc. This tends to grow with the driver size - (typically 6-10%). + The amount of off-heap memory (in megabytes) to be allocated for the driver and the driver submission server. This + is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to + grow with the driver size (typically 6-10%). diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 491064c5b214c..978816565fb8f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -110,10 +110,10 @@ package object config { ConfigBuilder("spark.kubernetes.driver.memoryOverhead") .doc(""" | The amount of off-heap memory (in megabytes) to be - | allocated for the driver. This is memory that accounts for - | things like VM overheads, interned strings, other native - | overheads, etc. This tends to grow with the driver's memory - | size (typically 6-10%). + | allocated for the driver and the driver submission server. + | This is memory that accounts for things like VM overheads, + | interned strings, other native overheads, etc. This tends + | to grow with the driver's memory size (typically 6-10%). """.stripMargin) .stringConf .createOptional @@ -169,7 +169,7 @@ package object config { .createOptional private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY = - ConfigBuilder("spark.kubernetes.driver.submitServerMemory") + ConfigBuilder("spark.kubernetes.driver.submissionServerMemory") .doc(""" | The amount of memory to allocate for the driver submission server. """.stripMargin) From 7321a3effb71ff8f65f664b1dc858c0b99514f6a Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 2 Mar 2017 16:17:42 -0800 Subject: [PATCH 3/3] Address comments --- .../spark/deploy/kubernetes/Client.scala | 24 +++++++++---------- .../spark/deploy/kubernetes/config.scala | 9 +++---- .../KubernetesClusterSchedulerBackend.scala | 19 ++++++++------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fc7ae954d00f2..24f8647a1641e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -64,17 +64,17 @@ private[spark] class Client( .getOrElse(Array.empty[String]) // Memory settings - private val driverMemory = sparkConf.get("spark.driver.memory", "1g") - private val driverMemoryBytes = Utils.byteStringAsBytes(driverMemory) - private val driverSubmitServerMemory = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY) - private val driverSubmitServerMemoryBytes = Utils.byteStringAsBytes(driverSubmitServerMemoryBytes) - private val driverContainerMemoryBytes = driverMemoryBytes + driverSubmitServerMemoryBytes - private val memoryOverheadBytes = sparkConf + private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY) + private val driverSubmitServerMemoryString = sparkConf.get( + KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key, + KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString) + private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb + private val memoryOverheadMb = sparkConf .get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) - .map(overhead => Utils.byteStringAsBytes(overhead)) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryBytes).toInt, + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt, MEMORY_OVERHEAD_MIN)) - private val driverContainerMemoryWithOverhead = driverContainerMemoryBytes + memoryOverheadBytes + private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) @@ -387,10 +387,10 @@ private[spark] class Client( .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() val driverMemoryQuantity = new QuantityBuilder(false) - .withAmount(driverContainerMemoryBytes.toString) + .withAmount(s"${driverContainerMemoryMb}M") .build() val driverMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(driverContainerMemoryWithOverhead.toString) + .withAmount(s"${driverContainerMemoryWithOverhead}M") .build() kubernetesClient.pods().createNew() .withNewMetadata() @@ -428,7 +428,7 @@ private[spark] class Client( // Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class. .addNewEnv() .withName(ENV_DRIVER_MEMORY) - .withValue(driverSubmitServerMemory) + .withValue(driverSubmitServerMemoryString) .endEnv() .addToEnv(sslConfiguration.sslPodEnvVars: _*) .withNewResources() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 978816565fb8f..c7f97b70e5181 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.{SPARK_VERSION => sparkVersion} import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit package object config { @@ -103,7 +104,7 @@ package object config { | overheads, etc. This tends to grow with the executor size | (typically 6-10%). """.stripMargin) - .stringConf + .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD = @@ -115,7 +116,7 @@ package object config { | interned strings, other native overheads, etc. This tends | to grow with the driver's memory size (typically 6-10%). """.stripMargin) - .stringConf + .bytesConf(ByteUnit.MiB) .createOptional private[spark] val KUBERNETES_DRIVER_LABELS = @@ -173,8 +174,8 @@ package object config { .doc(""" | The amount of memory to allocate for the driver submission server. """.stripMargin) - .stringConf - .createWithDefault("256m") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("256m") private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT = ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index f2cf325b16eda..90907ff83ed84 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend( .getOrElse( throw new SparkException("Must specify the driver pod name")) - private val executorMemory = conf.get("spark.executor.memory", "1g") - private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory) + private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) + private val executorMemoryString = conf.get( + org.apache.spark.internal.config.EXECUTOR_MEMORY.key, + org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString) - private val memoryOverheadBytes = conf + private val memoryOverheadMb = conf .get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD) - .map(overhead => Utils.byteStringAsBytes(overhead)) - .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt, + .getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt, MEMORY_OVERHEAD_MIN)) - private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes + private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1") @@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend( val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_APP_ID_LABEL -> applicationId()).asJava val executorMemoryQuantity = new QuantityBuilder(false) - .withAmount(executorMemoryBytes.toString) + .withAmount(s"${executorMemoryMb}M") .build() val executorMemoryLimitQuantity = new QuantityBuilder(false) - .withAmount(executorMemoryWithOverhead.toString) + .withAmount(s"${executorMemoryWithOverhead}M") .build() val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores) @@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), (ENV_EXECUTOR_CORES, executorCores), - (ENV_EXECUTOR_MEMORY, executorMemory), + (ENV_EXECUTOR_MEMORY, executorMemoryString), (ENV_APPLICATION_ID, applicationId()), (ENV_EXECUTOR_ID, executorId) ).map(env => new EnvVarBuilder()