From a5abf4bd60fb1094014ae7fe825f0d191c4e1476 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 14 Feb 2017 18:02:29 -0800 Subject: [PATCH] Use master environment ariable from KubernetesClusterSchedulerBackend --- .../deploy/kubernetes/KubernetesClientBuilder.scala | 11 +++++++---- .../apache/spark/deploy/kubernetes/constants.scala | 2 ++ .../KubernetesClusterSchedulerBackend.scala | 4 ++-- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala index 61d3ac17ac34..1d859dc72c8d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesClientBuilder.scala @@ -22,6 +22,8 @@ import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import org.apache.spark.deploy.kubernetes.constants._ + private[spark] object KubernetesClientBuilder { private val API_SERVER_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) private val CA_CERT_FILE = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) @@ -32,12 +34,13 @@ private[spark] object KubernetesClientBuilder { * are picked up from canonical locations, as they are injected * into the pod's disk space. */ - def buildFromWithinPod( - kubernetesMaster: String, - kubernetesNamespace: String): DefaultKubernetesClient = { + def buildFromWithinPod(kubernetesNamespace: String, useSsl: Boolean): DefaultKubernetesClient = { + val kubernetesHost = System.getenv(ENV_KUBERNETES_SERVICE_HOST) + val kubernetesPort = System.getenv(ENV_KUBERNETES_SERVICE_PORT) + val urlScheme = if (useSsl) "https" else "http" var clientConfigBuilder = new ConfigBuilder() .withApiVersion("v1") - .withMasterUrl(kubernetesMaster) + .withMasterUrl(s"$urlScheme://$kubernetesHost:$kubernetesPort") .withNamespace(kubernetesNamespace) if (CA_CERT_FILE.isFile) { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 027cc3c022b4..09b31304b80f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -63,6 +63,8 @@ package object constants { private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + private[spark] val ENV_KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST" + private[spark] val ENV_KUBERNETES_SERVICE_PORT = "KUBERNETES_SERVICE_PORT" // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index d4e7da464be4..32f5b196d879 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -44,7 +44,6 @@ private[spark] class KubernetesClusterSchedulerBackend( private val EXECUTOR_MODIFICATION_LOCK = new Object private val runningExecutorPods = new scala.collection.mutable.HashMap[String, Pod] - private val kubernetesMaster = "https://kubernetes" private val executorDockerImage = conf.get(EXECUTOR_DOCKER_IMAGE) private val kubernetesNamespace = conf.get(KUBERNETES_NAMESPACE) private val executorPort = conf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) @@ -76,8 +75,9 @@ private[spark] class KubernetesClusterSchedulerBackend( private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("kubernetes-executor-requests")) + private val kubernetesUseSsl = Client.resolveK8sMaster(sc.master).startsWith("https") private val kubernetesClient = KubernetesClientBuilder - .buildFromWithinPod(kubernetesMaster, kubernetesNamespace) + .buildFromWithinPod(kubernetesNamespace, kubernetesUseSsl) private val driverPod = try { kubernetesClient.pods().inNamespace(kubernetesNamespace).