Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,16 @@ from the other deployment modes. See the [configuration page](configuration.html
Specify the hard cpu limit for a single executor pod
</td>
</tr>
<tr>
<td><code>spark.kubernetes.node.selector.[labelKey]</code></td>
<td>(none)</td>
<td>
Adds to the node selector of the driver pod and executor pods, with key <code>labelKey</code> and the value as the
configuration's value. For example, setting <code>spark.kubernetes.node.selector.identifier</code> to <code>myIdentifier</code>
will result in the driver pod and executors having a node selector with key <code>identifier</code> and value
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
</table>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,18 @@ object ConfigurationUtils extends Logging {
}
combined.toMap
}

def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String,
configType: String): Map[String, String] = {
val fromPrefix = sparkConf.getAllWithPrefix(prefix)
fromPrefix.groupBy(_._1).foreach {
case (key, values) =>
require(values.size == 1,
s"Cannot have multiple values for a given $configType key, got key $key with" +
s" values $values")
}
fromPrefix.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ private[spark] class BaseDriverConfigurationStep(
s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" +
s" Spark bookkeeping operations.")
val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName)
val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX, "node selector")
val driverCpuQuantity = new QuantityBuilder(false)
.withAmount(driverCpuCores)
.build()
Expand Down Expand Up @@ -117,6 +119,7 @@ private[spark] class BaseDriverConfigurationStep(
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.endSpec()
.build()
val resolvedSparkConf = driverSpec.driverSparkConf.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX,
KUBERNETES_EXECUTOR_ANNOTATIONS,
"executor annotation")
private val nodeSelector =
ConfigurationUtils.parsePrefixedKeyValuePairs(
conf,
KUBERNETES_NODE_SELECTOR_PREFIX,
"node-selector")
private var shufflePodCache: Option[ShufflePodCache] = None
private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE)
private val dockerImagePullPolicy = conf.get(DOCKER_IMAGE_PULL_POLICY)
Expand Down Expand Up @@ -449,6 +454,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
.endMetadata()
.withNewSpec()
.withHostname(hostname)
.withNodeSelector(nodeSelector.asJava)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main reason to add this is easy debuging, there are several kubelets, I just want the driver and executor run on the specified machine. so I could analyze the process(such as look at cpu usage).
we had a yarn cluster running mr and spark jobs, there is a big nessesary for node labels(run some jobs on high mem machine), so I think spark on k8s jobs also need it

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the high memory machine use case, I would expect setting an appropriately large memory request on the driver/executor pods would cause the k8s scheduler to place them only in places where they fit, so here the high mem machine

the performance benchmarking use case is a good one

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high mem is just a example, there are some other factors like ssd, ppc

Copy link
Member

@foxish foxish Jun 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think trying to restrict the nodes a job runs on is a use case several people will have. But I like the solution of using the node affinity (annotation till 1.5, field in 1.6+), because it lets us express a superset of what we can express using node selectors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be valuable to just have support for node selectors and then later, have custom pod yamls (#38) for affinity, but we should have a discussion about this before adding this option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ash211 This node selector is not same as node affinity that I referred to as 1.6+ featured. As @foxish mentioned, node affinity is a superset of node selector. K8s added node affinity later to support more general use cases. From the reference doc:

nodeSelector provides a very simple way to constrain pods to nodes with particular labels. The affinity/anti-affinity feature, currently in beta, greatly expands the types of constraints you can express.
...
nodeSelector continues to work as usual, but will eventually be deprecated, as node affinity can express everything that nodeSelector can express

.endSpec()
.build()

Expand Down