From ac8ce3864d2f50b1db3dcecc5eae7f8f225319b2 Mon Sep 17 00:00:00 2001 From: Hong Zhiguo Date: Thu, 22 Jun 2017 15:33:55 +0800 Subject: [PATCH] allow spark driver find shuffle pods in specified namespace The conf property spark.kubernetes.shuffle.namespace is used to specify the namesapce of shuffle pods. In normal cases, only one "shuffle daemonset" is deployed and shared by all spark pods. The spark driver should be able to list and watch shuffle pods in the namespace specified by user. Note: by default, spark driver pod doesn't have authority to list and watch shuffle pods in another namespace. Some action is needed to grant it the authority. For example, below ABAC policy works. ``` {"apiVersion": "abac.authorization.kubernetes.io/v1beta1", "kind": "Policy", "spec": {"group": "system:serviceaccounts", "namespace": "SHUFFLE_NAMESPACE", "resource": "pods", "readonly": true}} ``` --- .../spark/scheduler/cluster/kubernetes/ShufflePodCache.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala index 53b4e745ce7c..15e02664589e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/ShufflePodCache.scala @@ -37,7 +37,8 @@ private[spark] class ShufflePodCache ( def start(): Unit = { // seed the initial cache. - val pods = client.pods().withLabels(dsLabels.asJava).list() + val pods = client.pods() + .inNamespace(dsNamespace).withLabels(dsLabels.asJava).list() pods.getItems.asScala.foreach { pod => if (Readiness.isReady(pod)) { @@ -50,6 +51,7 @@ private[spark] class ShufflePodCache ( watcher = client .pods() + .inNamespace(dsNamespace) .withLabels(dsLabels.asJava) .watch(new Watcher[Pod] { override def eventReceived(action: Watcher.Action, p: Pod): Unit = {