Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,29 @@ from the other deployment modes. See the [configuration page](configuration.html
</tr>
<tr>
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>executorMemory * 0.10, with minimum of 384</td>
<td>
The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things
like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size
(typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.submissionServerMemory</code></td>
<td>256m</td>
<td>
The amount of memory to allocate for the driver submission server.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.memoryOverhead</code></td>
<td>(driverMemory + driverSubmissionServerMemory) * 0.10, with minimum of 384</td>
<td>
The amount of off-heap memory (in megabytes) to be allocated for the driver and the driver submission server. This
is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to
grow with the driver size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.kubernetes.driver.labels</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ private[spark] class Client(
.map(_.split(","))
.getOrElse(Array.empty[String])

// Memory settings
private val driverMemoryMb = sparkConf.get(org.apache.spark.internal.config.DRIVER_MEMORY)
private val driverSubmitServerMemoryMb = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY)
private val driverSubmitServerMemoryString = sparkConf.get(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have separate driverSubmitServerMemoryMb and driverSubmitServerMemoryString and similarly for executors?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The strings are used in the environment variables so that they can be passed directly to the JVMs launch commands.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could achieve this using Utils.memoryStringToMb. Just a minor nit because it seems like repeated logic to fetch the same parameter in different ways.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trouble there is when we use SparkConf.get(config) it returns us a long, not a string - where the long is the number of megabytes pre-converted from the string value. I think the issue is that we want to go in the other direction; that is, to convert the numeric value we get from SparkConf.get into a memory string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I wonder if the Utils package has a helper for that. But in any case, it isn't a major concern. Merging. Thanks!

KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.key,
KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY.defaultValueString)
private val driverContainerMemoryMb = driverMemoryMb + driverSubmitServerMemoryMb
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we add the two here? Shouldn't we ideally take the max(...) since only one of them is running at a time, either the submit server or the driver code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we don't shut down the submit server after it starts the application, so both will be running for the whole time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw @ash211 opened an issue. We should solve this in a better way after the alpha.

private val memoryOverheadMb = sparkConf
.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverContainerMemoryMb).toInt,
MEMORY_OVERHEAD_MIN))
private val driverContainerMemoryWithOverhead = driverContainerMemoryMb + memoryOverheadMb

private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)

private val secretBase64String = {
Expand Down Expand Up @@ -408,6 +421,12 @@ private[spark] class Client(
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
val driverMemoryQuantity = new QuantityBuilder(false)
.withAmount(s"${driverContainerMemoryMb}M")
.build()
val driverMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(s"${driverContainerMemoryWithOverhead}M")
.build()
val driverPod = kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
Expand Down Expand Up @@ -442,7 +461,16 @@ private[spark] class Client(
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
// Note that SPARK_DRIVER_MEMORY only affects the REST server via spark-class.
.addNewEnv()
.withName(ENV_DRIVER_MEMORY)
.withValue(driverSubmitServerMemoryString)
.endEnv()
.addToEnv(sslConfiguration.sslPodEnvVars: _*)
.withNewResources()
.addToRequests("memory", driverMemoryQuantity)
.addToLimits("memory", driverMemoryLimitQuantity)
.endResources()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what were the defaults that we were relying on before?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no reliable default - probably just defaults to the underlying container runtime.

.withPorts(containerPorts.asJava)
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
.endContainer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config {

Expand Down Expand Up @@ -104,7 +105,19 @@ package object config {
| overheads, etc. This tends to grow with the executor size
| (typically 6-10%).
""".stripMargin)
.stringConf
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
.doc("""
| The amount of off-heap memory (in megabytes) to be
| allocated for the driver and the driver submission server.
| This is memory that accounts for things like VM overheads,
| interned strings, other native overheads, etc. This tends
| to grow with the driver's memory size (typically 6-10%).
""".stripMargin)
.bytesConf(ByteUnit.MiB)
.createOptional

private[spark] val KUBERNETES_DRIVER_LABELS =
Expand Down Expand Up @@ -177,6 +190,14 @@ package object config {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
ConfigBuilder("spark.kubernetes.driver.submissionServerMemory")
.doc("""
| The amount of memory to allocate for the driver submission server.
""".stripMargin)
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("256m")

private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
.doc("""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ package object constants {
private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY"
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this environment variable used? I didn't see any corresponding change to the dockerfile launching the driver.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Annotation keys
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
Expand All @@ -74,4 +75,6 @@ package object constants {
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN = 384L
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,16 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the driver pod name"))

private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)
private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY)
private val executorMemoryString = conf.get(
org.apache.spark.internal.config.EXECUTOR_MEMORY.key,
org.apache.spark.internal.config.EXECUTOR_MEMORY.defaultValueString)

private val memoryOverheadBytes = conf
private val memoryOverheadMb = conf
.get(KUBERNETES_EXECUTOR_MEMORY_OVERHEAD)
.map(overhead => Utils.byteStringAsBytes(overhead))
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryBytes).toInt,
.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * executorMemoryMb).toInt,
MEMORY_OVERHEAD_MIN))
private val executorMemoryWithOverhead = executorMemoryBytes + memoryOverheadBytes
private val executorMemoryWithOverhead = executorMemoryMb + memoryOverheadMb

private val executorCores = conf.getOption("spark.executor.cores").getOrElse("1")

Expand Down Expand Up @@ -165,10 +166,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
val selectors = Map(SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> applicationId()).asJava
val executorMemoryQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryBytes.toString)
.withAmount(s"${executorMemoryMb}M")
.build()
val executorMemoryLimitQuantity = new QuantityBuilder(false)
.withAmount(executorMemoryWithOverhead.toString)
.withAmount(s"${executorMemoryWithOverhead}M")
.build()
val executorCpuQuantity = new QuantityBuilder(false)
.withAmount(executorCores)
Expand All @@ -177,7 +178,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
(ENV_EXECUTOR_PORT, executorPort.toString),
(ENV_DRIVER_URL, driverUrl),
(ENV_EXECUTOR_CORES, executorCores),
(ENV_EXECUTOR_MEMORY, executorMemory),
(ENV_EXECUTOR_MEMORY, executorMemoryString),
(ENV_APPLICATION_ID, applicationId()),
(ENV_EXECUTOR_ID, executorId)
).map(env => new EnvVarBuilder()
Expand Down Expand Up @@ -261,7 +262,5 @@ private[spark] class KubernetesClusterSchedulerBackend(

private object KubernetesClusterSchedulerBackend {
private val DEFAULT_STATIC_PORT = 10000
private val MEMORY_OVERHEAD_FACTOR = 0.10
private val MEMORY_OVERHEAD_MIN = 384L
private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
}