From 59556070f1b970a5a54739790778962d764c9132 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 24 Feb 2017 18:02:28 -0800 Subject: [PATCH 1/6] Listen for annotations that provide external URIs. --- docs/running-on-kubernetes.md | 42 +++- ...eploy.rest.kubernetes.DriverServiceManager | 2 + .../spark/deploy/kubernetes/Client.scala | 223 ++++++++++-------- .../KubernetesResourceCleaner.scala | 3 +- .../spark/deploy/kubernetes/config.scala | 21 ++ .../spark/deploy/kubernetes/constants.scala | 6 + .../kubernetes/DriverServiceManager.scala | 93 ++++++++ ...rnalSuppliedUrisDriverServiceManager.scala | 101 ++++++++ .../NodePortUrisDriverServiceManager.scala | 67 ++++++ .../ExternalUriProviderWatch.scala | 75 ++++++ .../integrationtest/KubernetesSuite.scala | 50 ++++ 11 files changed, 583 insertions(+), 100 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 9d49ac6829723..07190aea0e9bc 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -106,6 +106,33 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio kubernetes-client library does not support. Authentication using X509 Client Certs and oauth tokens is currently supported. +### Determining the Driver Base URI + +Kubernetes pods run with their own IP address space. If Spark is run in cluster mode, the driver pod may not be +accessible to the submitter. However, the submitter needs to send local dependencies from its local disk to the driver +pod. + +By default, Spark will place a [Service](https://kubernetes.io/docs/user-guide/services/#type-nodeport) with a NodePort +that is opened on every Kubelet. The submission client will then contact the driver at one of the node's addresses with +the appropriate service port. + +There may be cases where the Kubelet nodes cannot be reached by the submission client. For example, the cluster may +only be reachable through an external load balancer. The user may provide their own external IP for Spark driver +services. To use a your own external IP instead of a Kubelet's IP, first set +`spark.kubernetes.driver.serviceManagerType` to `ExternalAnnotation`. This will cause a service to be created that +routes to the driver pod with the annotation `spark-job.alpha.apache.org/provideExternalUri`. You will need to run a +process that watches the API server for services that are created with this annotation in the application's namespace +(set by `spark.kubernetes.namespace`). The process should determine a URI that routes to this service, and patch the +service to include an annotation `spark-job.alpha.apache.org/resolvedExternalUri`, which has its value as the external +URI that your process has provided. + +Note that if the URI provided by the annotation also provides a base path, the base path should be removed when the +request is forwarded to the back end pod. + +If the above is confusing, keep in mind that this functionality is only necessary if the submitter cannot reach any of +the Kubelets at the driver's node port. It is recommended to use the default configuration with the node port service +whenever possible. + ### Spark Properties Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same @@ -199,7 +226,7 @@ from the other deployment modes. See the [configuration page](configuration.html false Whether to expose the driver Web UI port as a service NodePort. Turned off by default because NodePort is a limited - resource. Use alternatives such as Ingress if possible. + resource. @@ -217,6 +244,19 @@ from the other deployment modes. See the [configuration page](configuration.html Interval between reports of the current Spark job status in cluster mode. + + spark.kubernetes.driver.serviceManagerType + NodePort + + A tag indicating which class to use for creating the Kubernetes service and determining its URI for the submission + client. By default, a service is created with the NodePort type, and the driver will be contacted at one of the + kubelet nodes at the port that the Kubelets expose for the service. If the Kubelets cannot be contacted from the + submitter's machine, consider setting this to ExternalAnnotation as described in "Determining the + Driver Base URI" above. One may also include a custom implementation of + org.apache.spark.deploy.rest.kubernetes.DriverServiceManager on the submitter's classpath - + spark-submitload an instance of that class via Service Loading. This method should only be done as a last resort. + + ## Current Limitations diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager new file mode 100644 index 0000000000000..5a306335b4166 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.rest.kubernetes.DriverServiceManager @@ -0,0 +1,2 @@ +org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager +org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index c787d5917e381..0607a335abc25 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes import java.io.File import java.security.SecureRandom import java.util +import java.util.ServiceLoader import java.util.concurrent.{CountDownLatch, TimeUnit} import com.google.common.io.Files @@ -56,6 +57,7 @@ private[spark] class Client( private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + private val driverServiceManagerType = sparkConf.get(DRIVER_SERVICE_MANAGER_TYPE) private val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) @@ -91,6 +93,7 @@ private[spark] class Client( s" is a directory.") } val parsedCustomLabels = parseCustomLabels(customLabels) + val driverServiceManager = getDriverServiceManager var k8ConfBuilder = new K8SConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) @@ -107,33 +110,50 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => - ShutdownHookManager.addShutdownHook(() => - kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) - val sslConfigurationProvider = new SslConfigurationProvider( - sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) - val submitServerSecret = kubernetesClient.secrets().createNew() - .withNewMetadata() - .withName(secretName) - .endMetadata() - .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) - .withType("Opaque") - .done() - kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) - try { - val sslConfiguration = sslConfigurationProvider.getSslConfiguration() - // start outer watch for status logging of driver pod - val driverPodCompletedLatch = new CountDownLatch(1) - // only enable interval logging if in waitForAppCompletion mode - val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 - val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, - loggingInterval) - Utils.tryWithResource(kubernetesClient - .pods() - .withName(kubernetesAppId) - .watch(loggingWatch)) { _ => + driverServiceManager.start(kubernetesClient, kubernetesAppId, sparkConf) + // Begin monitoring the state of the driver pod + val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 + val driverPodCompletedLatch = new CountDownLatch(1) + val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, + loggingInterval) + Utils.tryWithResource(kubernetesClient + .pods() + .withName(kubernetesAppId) + .watch(loggingWatch)) { _ => + val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() => + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) + val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook( + ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY)( + () => driverServiceManager.stop()) + // Place the error hook at a higher priority in order for the error hook to run before + // the stop hook. + val serviceManagerErrorHook = ShutdownHookManager.addShutdownHook( + ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY + 1)(() => + driverServiceManager.handleSubmissionError( + new SparkException("Submission shutting down early..."))) + try { + val sslConfigurationProvider = new SslConfigurationProvider( + sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner) + val submitServerSecret = kubernetesClient.secrets().createNew() + .withNewMetadata() + .withName(secretName) + .endMetadata() + .withData(Map((SUBMISSION_APP_SECRET_NAME, secretBase64String)).asJava) + .withType("Opaque") + .done() + kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) + val sslConfiguration = sslConfigurationProvider.getSslConfiguration() + // start outer watch for status logging of driver pod + // only enable interval logging if in waitForAppCompletion mode + val driverKubernetesSelectors = (Map( + SPARK_DRIVER_LABEL -> kubernetesAppId, + SPARK_APP_ID_LABEL -> kubernetesAppId, + SPARK_APP_NAME_LABEL -> appName) + ++ parsedCustomLabels) val (driverPod, driverService) = launchDriverKubernetesComponents( kubernetesClient, - parsedCustomLabels, + driverServiceManager, + driverKubernetesSelectors, submitServerSecret, sslConfiguration) configureOwnerReferences( @@ -144,6 +164,7 @@ private[spark] class Client( driverService) submitApplicationToDriverServer( kubernetesClient, + driverServiceManager, sslConfiguration, driverService, submitterLocalFiles, @@ -153,23 +174,43 @@ private[spark] class Client( // those. kubernetesResourceCleaner.unregisterResource(driverPod) kubernetesResourceCleaner.unregisterResource(driverService) - // wait if configured to do so - if (waitForAppCompletion) { - logInfo(s"Waiting for application $kubernetesAppId to finish...") - driverPodCompletedLatch.await() - logInfo(s"Application $kubernetesAppId finished.") - } else { - logInfo(s"Application $kubernetesAppId successfully launched.") + } catch { + case e: Throwable => + driverServiceManager.handleSubmissionError(e) + } finally { + Utils.tryLogNonFatalError { + kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } + Utils.tryLogNonFatalError { + driverServiceManager.stop() + } + + // Remove the shutdown hooks that would be redundant + Utils.tryLogNonFatalError { + ShutdownHookManager.removeShutdownHook(resourceCleanShutdownHook) + } + Utils.tryLogNonFatalError { + ShutdownHookManager.removeShutdownHook(cleanupServiceManagerHook) + } + Utils.tryLogNonFatalError { + ShutdownHookManager.removeShutdownHook(serviceManagerErrorHook) + } + } + // wait if configured to do so + if (waitForAppCompletion) { + logInfo(s"Waiting for application $kubernetesAppId to finish...") + driverPodCompletedLatch.await() + logInfo(s"Application $kubernetesAppId finished.") + } else { + logInfo(s"Application $kubernetesAppId successfully launched.") } - } finally { - kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient) } } } private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, + driverServiceManager: DriverServiceManager, sslConfiguration: SslConfiguration, driverService: Service, submitterLocalFiles: Iterable[String], @@ -185,7 +226,10 @@ private[spark] class Client( sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString) sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString) - val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService, + val driverSubmitter = buildDriverSubmissionClient( + kubernetesClient, + driverServiceManager, + driverService, sslConfiguration) // Sanity check to see if the driver submitter is even reachable. driverSubmitter.ping() @@ -215,14 +259,10 @@ private[spark] class Client( private def launchDriverKubernetesComponents( kubernetesClient: KubernetesClient, - parsedCustomLabels: Map[String, String], + driverServiceManager: DriverServiceManager, + driverKubernetesSelectors: Map[String, String], submitServerSecret: Secret, sslConfiguration: SslConfiguration): (Pod, Service) = { - val driverKubernetesSelectors = (Map( - SPARK_DRIVER_LABEL -> kubernetesAppId, - SPARK_APP_ID_LABEL -> kubernetesAppId, - SPARK_APP_NAME_LABEL -> appName) - ++ parsedCustomLabels).asJava val endpointsReadyFuture = SettableFuture.create[Endpoints] val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture) val serviceReadyFuture = SettableFuture.create[Service] @@ -241,17 +281,15 @@ private[spark] class Client( .endpoints() .withName(kubernetesAppId) .watch(endpointsReadyWatcher)) { _ => - val driverService = createDriverService( - kubernetesClient, - driverKubernetesSelectors, - submitServerSecret) + val serviceTemplate = createDriverServiceTemplate(driverKubernetesSelectors) + val driverService = kubernetesClient.services().create( + driverServiceManager.customizeDriverService(serviceTemplate).build()) kubernetesResourceCleaner.registerOrUpdateResource(driverService) val driverPod = createDriverPod( kubernetesClient, - driverKubernetesSelectors, + driverKubernetesSelectors.asJava, submitServerSecret, sslConfiguration) - kubernetesResourceCleaner.registerOrUpdateResource(driverPod) waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture, serviceReadyFuture, podReadyFuture) (driverPod, driverService) @@ -340,28 +378,6 @@ private[spark] class Client( } } - private def createDriverService( - kubernetesClient: KubernetesClient, - driverKubernetesSelectors: java.util.Map[String, String], - submitServerSecret: Secret): Service = { - val driverSubmissionServicePort = new ServicePortBuilder() - .withName(SUBMISSION_SERVER_PORT_NAME) - .withPort(SUBMISSION_SERVER_PORT) - .withNewTargetPort(SUBMISSION_SERVER_PORT) - .build() - kubernetesClient.services().createNew() - .withNewMetadata() - .withName(kubernetesAppId) - .withLabels(driverKubernetesSelectors) - .endMetadata() - .withNewSpec() - .withType("NodePort") - .withSelector(driverKubernetesSelectors) - .withPorts(driverSubmissionServicePort) - .endSpec() - .done() - } - private def createDriverPod( kubernetesClient: KubernetesClient, driverKubernetesSelectors: util.Map[String, String], @@ -373,7 +389,7 @@ private[spark] class Client( .withPath("/v1/submissions/ping") .withNewPort(SUBMISSION_SERVER_PORT_NAME) .build() - kubernetesClient.pods().createNew() + val driverPod = kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) .withLabels(driverKubernetesSelectors) @@ -412,6 +428,26 @@ private[spark] class Client( .endContainer() .endSpec() .done() + kubernetesResourceCleaner.registerOrUpdateResource(driverPod) + driverPod + } + + private def createDriverServiceTemplate(driverKubernetesSelectors: Map[String, String]) + : ServiceBuilder = { + val driverSubmissionServicePort = new ServicePortBuilder() + .withName(SUBMISSION_SERVER_PORT_NAME) + .withPort(SUBMISSION_SERVER_PORT) + .withNewTargetPort(SUBMISSION_SERVER_PORT) + .build() + new ServiceBuilder() + .withNewMetadata() + .withName(kubernetesAppId) + .withLabels(driverKubernetesSelectors.asJava) + .endMetadata() + .withNewSpec() + .withSelector(driverKubernetesSelectors.asJava) + .withPorts(driverSubmissionServicePort) + .endSpec() } private class DriverPodReadyWatcher(resolvedDriverPod: SettableFuture[Pod]) extends Watcher[Pod] { @@ -562,36 +598,14 @@ private[spark] class Client( private def buildDriverSubmissionClient( kubernetesClient: KubernetesClient, + driverServiceManager: DriverServiceManager, service: Service, sslConfiguration: SslConfiguration): KubernetesSparkRestApi = { - val urlScheme = if (sslConfiguration.sslOptions.enabled) { - "https" - } else { - logWarning("Submitting application details, application secret, and local" + - " jars to the cluster over an insecure connection. You should configure SSL" + - " to secure this step.") - "http" - } - val servicePort = service.getSpec.getPorts.asScala - .filter(_.getName == SUBMISSION_SERVER_PORT_NAME) - .head.getNodePort - val nodeUrls = kubernetesClient.nodes.list.getItems.asScala - .filterNot(node => node.getSpec.getUnschedulable != null && - node.getSpec.getUnschedulable) - .flatMap(_.getStatus.getAddresses.asScala) - // The list contains hostnames, internal and external IP addresses. - // (https://kubernetes.io/docs/admin/node/#addresses) - // we want only external IP addresses and legacyHostIP addresses in our list - // legacyHostIPs are deprecated and will be removed in the future. - // (https://github.com/kubernetes/kubernetes/issues/9267) - .filter(address => address.getType == "ExternalIP" || address.getType == "LegacyHostIP") - .map(address => { - s"$urlScheme://${address.getAddress}:$servicePort" - }).toSet - require(nodeUrls.nonEmpty, "No nodes found to contact the driver!") + val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service) + require(serviceUris.nonEmpty, "No uris found to contact the driver!") HttpClientUtil.createClient[KubernetesSparkRestApi]( - uris = nodeUrls, - maxRetriesPerServer = 3, + uris = serviceUris, + maxRetriesPerServer = 10, sslSocketFactory = sslConfiguration .driverSubmitClientSslContext .getSocketFactory, @@ -619,6 +633,21 @@ private[spark] class Client( }).toMap }).getOrElse(Map.empty[String, String]) } + + private def getDriverServiceManager: DriverServiceManager = { + val driverServiceManagerLoader = ServiceLoader.load(classOf[DriverServiceManager]) + val matchingServiceManagers = driverServiceManagerLoader + .iterator() + .asScala + .filter(_.getServiceManagerType == driverServiceManagerType) + .toList + require(matchingServiceManagers.nonEmpty, + s"No driver service manager found matching type $driverServiceManagerType") + require(matchingServiceManagers.size == 1, "Multiple service managers found" + + s" matching type $driverServiceManagerType, got: " + + matchingServiceManagers.map(_.getClass).toList.mkString(",")) + matchingServiceManagers.head + } } private[spark] object Client extends Logging { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala index fb76b04604479..6360bc0e48948 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesResourceCleaner.scala @@ -23,8 +23,7 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class KubernetesResourceCleaner - extends Logging { +private[spark] class KubernetesResourceCleaner extends Logging { private val resources = mutable.HashMap.empty[(String, String), HasMetadata] diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index a21ec2101cc6e..a7d2441eb7944 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.kubernetes import java.util.concurrent.TimeUnit import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.deploy.rest.kubernetes.NodePortUrisDriverServiceManager import org.apache.spark.internal.config.ConfigBuilder package object config { @@ -146,6 +147,16 @@ package object config { .stringConf .createOptional + private[spark] val DRIVER_SUBMIT_SSL_ENABLED = + ConfigBuilder("spark.ssl.kubernetes.submit.enabled") + .doc(""" + | Whether or not to use SSL when sending the + | application dependencies to the driver pod. + | + """.stripMargin) + .booleanConf + .createWithDefault(false) + private[spark] val KUBERNETES_DRIVER_SERVICE_NAME = ConfigBuilder("spark.kubernetes.driver.service.name") .doc(""" @@ -174,6 +185,16 @@ package object config { .stringConf .createOptional + private[spark] val DRIVER_SERVICE_MANAGER_TYPE = + ConfigBuilder("spark.kubernetes.driver.serviceManagerType") + .doc(s""" + | A tag indicating which class to use for creating the + | Kubernetes service and determining its URI for the submission + | client. + """.stripMargin) + .stringConf + .createWithDefault(NodePortUrisDriverServiceManager.TYPE) + private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.kubernetes.submit.waitAppCompletion") .doc( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 688cd858e79ff..10ddb12463894 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -64,6 +64,12 @@ package object constants { private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" + // Annotation keys + private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI = + "spark-job.alpha.apache.org/provideExternalUri" + private[spark] val ANNOTATION_RESOLVED_EXTERNAL_URI = + "spark-job.alpha.apache.org/resolvedExternalUri" + // Miscellaneous private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver" private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala new file mode 100644 index 0000000000000..ff130a86ede39 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.kubernetes + +import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf + +/** + * Implementations of this interface as responsible for exposing the driver pod by: + * - Creating a Kubernetes Service that is backed by the driver pod, and + * - Providing one or more URIs that the service can be reached at from the submission client. + * + * In general, one should not need to implement custom variants of this interface. Consider + * if the built-in service managers, NodePort and ExternalAnnotation, suit your needs first. + * + * This API is in an alpha state and may break without notice. + */ +trait DriverServiceManager { + + protected var kubernetesClient: KubernetesClient = _ + protected var serviceName: String = _ + protected var sparkConf: SparkConf = _ + + /** + * The tag that identifies this service manager type. This service manager will be loaded + * only if the Spark configuration spark.kubernetes.driver.serviceManagerType matches this + * value. + */ + def getServiceManagerType: String + + /** + * Guaranteed to be called before {@link createDriverService} or + * {@link getDriverServiceSubmissionServerUris} is called. If this is overridden, + * always make sure to invoke super(). + */ + def start( + kubernetesClient: KubernetesClient, + serviceName: String, + sparkConf: SparkConf): Unit = { + this.kubernetesClient = kubernetesClient + this.serviceName = serviceName + this.sparkConf = sparkConf + } + + /** + * Customize the driver service that overlays on the driver pod. + * + * Implementations are expected to take the service template and adjust it + * according to the particular needs of how the Service will be accessed by + * URIs provided in {@link getDriverServiceSubmissionServerUris}. + * + * @param driverServiceTemplate Base settings for the driver service. + * @return The same ServiceBuilder object with any required customizations. + */ + def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder + + /** + * Return the set of URIs that can be used to reach the submission server that + * is running on the driver pod. + */ + def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] + + /** + * Called when the Spark application ended up failing to start. Allows the service + * manager to clean up any state it may have created that should not be persisted + * in the case of an unsuccessful launch. Note that stop() is still called + * regardless if this method is called. + */ + def handleSubmissionError(cause: Throwable): Unit = {} + + /** + * Perform any cleanup of this service manager. If overridden, be sure to invoke + * the super implementation. + */ + def stop() {} +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala new file mode 100644 index 0000000000000..4754e03bdcd19 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +private[spark] class ExternalUriSetWatcher(externalUriFuture: SettableFuture[String]) + extends Watcher[Service] with Logging { + + override def eventReceived(action: Action, service: Service): Unit = { + if (action == Action.MODIFIED && !externalUriFuture.isDone) { + service + .getMetadata + .getAnnotations + .asScala + .get(ANNOTATION_RESOLVED_EXTERNAL_URI) + .foreach(externalUriFuture.set) + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("External URI set watcher closed.", cause) + } +} + +private[spark] class ExternalSuppliedUrisDriverServiceManager + extends DriverServiceManager with Logging { + + private val externalUriFuture = SettableFuture.create[String] + private var externalUriSetWatch: Option[Watch] = None + + override def start( + kubernetesClient: KubernetesClient, + serviceName: String, + sparkConf: SparkConf): Unit = { + super.start(kubernetesClient, serviceName, sparkConf) + externalUriSetWatch = Some(kubernetesClient + .services() + .withName(serviceName) + .watch(new ExternalUriSetWatcher(externalUriFuture))) + } + + override def getServiceManagerType: String = ExternalSuppliedUrisDriverServiceManager.TYPE + + override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { + val timeoutSeconds = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + require(externalUriSetWatch.isDefined, "The watch that listens for the provision of" + + " the external URI was not started; was start() called?") + Set(externalUriFuture.get(timeoutSeconds, TimeUnit.SECONDS)) + } + + override def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder = { + require(serviceName != null, "Service name was null; was start() called?") + driverServiceTemplate + .editMetadata() + .addToAnnotations(ANNOTATION_PROVIDE_EXTERNAL_URI, "true") + .endMetadata() + .editSpec() + .withType("ClusterIP") + .endSpec() + } + + override def stop(): Unit = { + super.stop() + Utils.tryLogNonFatalError { + externalUriSetWatch.foreach(_.close()) + externalUriSetWatch = None + } + } +} + +private[spark] object ExternalSuppliedUrisDriverServiceManager { + val TYPE = "ExternalAnnotation" +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala new file mode 100644 index 0000000000000..1247c883d39e5 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import io.fabric8.kubernetes.api.model.{Service, ServiceBuilder} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.Logging + +private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManager with Logging { + + override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { + val urlScheme = if (sparkConf.get(DRIVER_SUBMIT_SSL_ENABLED)) { + "https" + } else { + logWarning("Submitting application details, application secret, and local" + + " jars to the cluster over an insecure connection. You should configure SSL" + + " to secure this step.") + "http" + } + val servicePort = driverService.getSpec.getPorts.asScala + .filter(_.getName == SUBMISSION_SERVER_PORT_NAME) + .head.getNodePort + val nodeUrls = kubernetesClient.nodes.list.getItems.asScala + .filterNot(node => node.getSpec.getUnschedulable != null && + node.getSpec.getUnschedulable) + .flatMap(_.getStatus.getAddresses.asScala) + // The list contains hostnames, internal and external IP addresses. + // (https://kubernetes.io/docs/admin/node/#addresses) + // we want only external IP addresses and legacyHostIP addresses in our list + // legacyHostIPs are deprecated and will be removed in the future. + // (https://github.com/kubernetes/kubernetes/issues/9267) + .filter(address => address.getType == "ExternalIP" || address.getType == "LegacyHostIP") + .map(address => { + s"$urlScheme://${address.getAddress}:$servicePort" + }).toSet + require(nodeUrls.nonEmpty, "No nodes found to contact the driver!") + nodeUrls + } + + override def getServiceManagerType: String = NodePortUrisDriverServiceManager.TYPE + + override def customizeDriverService(driverServiceTemplate: ServiceBuilder) + : ServiceBuilder = { + driverServiceTemplate.editSpec().withType("NodePort").endSpec() + } +} + +private[spark] object NodePortUrisDriverServiceManager { + val TYPE = "NodePort" +} \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala new file mode 100644 index 0000000000000..3199a8c385f95 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.util.concurrent.atomic.AtomicBoolean + +import io.fabric8.kubernetes.api.model.Service +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.internal.Logging + +/** + * A slightly unrealistic implementation of external URI provision, but works + * for tests - essentially forces the service to revert back to being exposed + * on NodePort. + */ +private[spark] class ExternalUriProviderWatch(kubernetesClient: KubernetesClient) + extends Watcher[Service] with Logging { + + // Visible for testing + val annotationSet = new AtomicBoolean(false) + + override def eventReceived(action: Action, service: Service): Unit = { + if (action == Action.ADDED) { + service.getMetadata + .getAnnotations + .asScala + .get(ANNOTATION_PROVIDE_EXTERNAL_URI).foreach { _ => + if (!annotationSet.getAndSet(true)) { + val nodePortService = kubernetesClient.services().withName(service.getMetadata.getName) + .edit() + .editSpec() + .withType("NodePort") + .endSpec() + .done() + val submissionServerPort = nodePortService + .getSpec() + .getPorts + .asScala + .find(_.getName == SUBMISSION_SERVER_PORT_NAME) + .map(_.getNodePort) + .getOrElse(throw new IllegalStateException("Submission server port not found.")) + val resolvedNodePortUri = s"http://${Minikube.getMinikubeIp}:$submissionServerPort" + kubernetesClient.services().withName(service.getMetadata.getName).edit() + .editMetadata() + .addToAnnotations(ANNOTATION_RESOLVED_EXTERNAL_URI, resolvedNodePortUri) + .endMetadata() + .done() + } + } + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logWarning("External URI provider watch closed.", cause) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index fe171db15b3d1..d32bb0a47ad4f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -35,10 +35,13 @@ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkSubmit import org.apache.spark.deploy.kubernetes.Client +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils +import org.apache.spark.deploy.rest.kubernetes.ExternalSuppliedUrisDriverServiceManager import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @@ -108,6 +111,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .withGracePeriod(60) .delete }) + // spark-submit sets system properties so we have to clear them + new SparkConf(true).getAll.map(_._1).foreach { System.clearProperty } } override def afterAll(): Unit = { @@ -366,4 +371,49 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { s" with correct contents."), "Job did not find the file as expected.") } } + + test("Use external URI provider") { + val externalUriProviderWatch = new ExternalUriProviderWatch(minikubeKubernetesClient) + Utils.tryWithResource(minikubeKubernetesClient.services() + .withLabel("spark-app-name", "spark-pi") + .watch(externalUriProviderWatch)) { _ => + val args = Array( + "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", + "--deploy-mode", "cluster", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-pi", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--jars", HELPER_JAR_FILE.getAbsolutePath, + "--class", SPARK_PI_MAIN_CLASS, + "--conf", "spark.ui.enabled=true", + "--conf", "spark.testing=false", + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + "--conf", "spark.kubernetes.submit.waitAppCompletion=false", + "--conf", s"${DRIVER_SERVICE_MANAGER_TYPE.key}=${ExternalSuppliedUrisDriverServiceManager.TYPE}", + EXAMPLES_JAR_FILE.getAbsolutePath) + SparkSubmit.main(args) + val sparkMetricsService = getSparkMetricsService("spark-pi") + expectationsForStaticAllocation(sparkMetricsService) + assert(externalUriProviderWatch.annotationSet.get) + val driverService = minikubeKubernetesClient + .services() + .withLabel("spark-app-name", "spark-pi") + .list() + .getItems + .asScala(0) + assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_PROVIDE_EXTERNAL_URI), + "External URI request annotation was not set on the driver service.") + // Unfortunately we can't check the correctness of the actual value of the URI, as it depends + // on the driver submission port set on the driver service but we remove that port from the + // service once the submission is complete. + assert(driverService.getMetadata.getAnnotations.containsKey(ANNOTATION_RESOLVED_EXTERNAL_URI), + "Resolved URI annotation not set on driver service.") + } + } } From ba505387c810a0b71f83bdb88bc203b89af5bcf4 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 24 Feb 2017 18:03:47 -0800 Subject: [PATCH 2/6] FIx scalstyle --- .../rest/kubernetes/NodePortUrisDriverServiceManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala index 1247c883d39e5..9c74867b17a72 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -64,4 +64,4 @@ private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManag private[spark] object NodePortUrisDriverServiceManager { val TYPE = "NodePort" -} \ No newline at end of file +} From b848ee17bee29716a9d614a44c2651ae49dc24a7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 24 Feb 2017 18:29:14 -0800 Subject: [PATCH 3/6] Address comments --- docs/running-on-kubernetes.md | 30 ++++---- .../spark/deploy/kubernetes/Client.scala | 5 +- .../kubernetes/DriverServiceManager.scala | 27 ++++--- ...rnalSuppliedUrisDriverServiceManager.scala | 72 ++++++++++--------- .../NodePortUrisDriverServiceManager.scala | 17 +++-- 5 files changed, 83 insertions(+), 68 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 07190aea0e9bc..ee7cf2c5a85ec 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -113,24 +113,24 @@ accessible to the submitter. However, the submitter needs to send local dependen pod. By default, Spark will place a [Service](https://kubernetes.io/docs/user-guide/services/#type-nodeport) with a NodePort -that is opened on every Kubelet. The submission client will then contact the driver at one of the node's addresses with -the appropriate service port. +that is opened on every node. The submission client will then contact the driver at one of the node's +addresses with the appropriate service port. -There may be cases where the Kubelet nodes cannot be reached by the submission client. For example, the cluster may +There may be cases where the nodes cannot be reached by the submission client. For example, the cluster may only be reachable through an external load balancer. The user may provide their own external IP for Spark driver -services. To use a your own external IP instead of a Kubelet's IP, first set -`spark.kubernetes.driver.serviceManagerType` to `ExternalAnnotation`. This will cause a service to be created that -routes to the driver pod with the annotation `spark-job.alpha.apache.org/provideExternalUri`. You will need to run a +services. To use a your own external IP instead of a node's IP, first set +`spark.kubernetes.driver.serviceManagerType` to `ExternalAnnotation`. A service will be created with the annotation +`spark-job.alpha.apache.org/provideExternalUri`, and this service routes to the driver pod. You will need to run a process that watches the API server for services that are created with this annotation in the application's namespace (set by `spark.kubernetes.namespace`). The process should determine a URI that routes to this service, and patch the service to include an annotation `spark-job.alpha.apache.org/resolvedExternalUri`, which has its value as the external -URI that your process has provided. +URI that your process has provided (e.g. `https://example.com:8080/my-job`). Note that if the URI provided by the annotation also provides a base path, the base path should be removed when the request is forwarded to the back end pod. If the above is confusing, keep in mind that this functionality is only necessary if the submitter cannot reach any of -the Kubelets at the driver's node port. It is recommended to use the default configuration with the node port service +the nodes at the driver's node port. It is recommended to use the default configuration with the node port service whenever possible. ### Spark Properties @@ -249,12 +249,14 @@ from the other deployment modes. See the [configuration page](configuration.html NodePort A tag indicating which class to use for creating the Kubernetes service and determining its URI for the submission - client. By default, a service is created with the NodePort type, and the driver will be contacted at one of the - kubelet nodes at the port that the Kubelets expose for the service. If the Kubelets cannot be contacted from the - submitter's machine, consider setting this to ExternalAnnotation as described in "Determining the - Driver Base URI" above. One may also include a custom implementation of - org.apache.spark.deploy.rest.kubernetes.DriverServiceManager on the submitter's classpath - - spark-submitload an instance of that class via Service Loading. This method should only be done as a last resort. + client. Valid values are currently NodePort and ExternalAnnotation. By default, a service + is created with the NodePort type, and the driver will be contacted at one of the kubelet nodes at the port that the + Kubelets expose for the service. If the Kubelets cannot be contacted from the submitter's machine, consider setting + this to ExternalAnnotation as described in "Determining the Driver Base URI" above. One may also + include a custom implementation of org.apache.spark.deploy.rest.kubernetes.DriverServiceManager on the + submitter's classpath - spark-submit loads an instance of that class via Service Loading. To use the custom + implementation, set this value to the custom implementation's return value of + DriverServiceManager#getServiceManagerType(). This method should only be done as a last resort. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 0607a335abc25..c7a9d34c4f7da 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -111,7 +111,8 @@ private[spark] class Client( val k8ClientConfig = k8ConfBuilder.build Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient => driverServiceManager.start(kubernetesClient, kubernetesAppId, sparkConf) - // Begin monitoring the state of the driver pod + // start outer watch for status logging of driver pod + // only enable interval logging if in waitForAppCompletion mode val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0 val driverPodCompletedLatch = new CountDownLatch(1) val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId, @@ -143,8 +144,6 @@ private[spark] class Client( .done() kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret) val sslConfiguration = sslConfigurationProvider.getSslConfiguration() - // start outer watch for status logging of driver pod - // only enable interval logging if in waitForAppCompletion mode val driverKubernetesSelectors = (Map( SPARK_DRIVER_LABEL -> kubernetesAppId, SPARK_APP_ID_LABEL -> kubernetesAppId, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala index ff130a86ede39..e1b24fd0f6362 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala @@ -23,7 +23,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkConf /** - * Implementations of this interface as responsible for exposing the driver pod by: + * Implementations of this interface are responsible for exposing the driver pod by: * - Creating a Kubernetes Service that is backed by the driver pod, and * - Providing one or more URIs that the service can be reached at from the submission client. * @@ -45,20 +45,25 @@ trait DriverServiceManager { */ def getServiceManagerType: String - /** - * Guaranteed to be called before {@link createDriverService} or - * {@link getDriverServiceSubmissionServerUris} is called. If this is overridden, - * always make sure to invoke super(). - */ - def start( + final def start( kubernetesClient: KubernetesClient, serviceName: String, sparkConf: SparkConf): Unit = { this.kubernetesClient = kubernetesClient this.serviceName = serviceName this.sparkConf = sparkConf + onStart(kubernetesClient, serviceName, sparkConf) } + /** + * Guaranteed to be called before {@link createDriverService} or + * {@link getDriverServiceSubmissionServerUris} is called. + */ + protected def onStart( + kubernetesClient: KubernetesClient, + serviceName: String, + sparkConf: SparkConf): Unit = {} + /** * Customize the driver service that overlays on the driver pod. * @@ -78,16 +83,18 @@ trait DriverServiceManager { def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] /** - * Called when the Spark application ended up failing to start. Allows the service + * Called when the Spark application failed to start. Allows the service * manager to clean up any state it may have created that should not be persisted * in the case of an unsuccessful launch. Note that stop() is still called * regardless if this method is called. */ def handleSubmissionError(cause: Throwable): Unit = {} + final def stop(): Unit = onStop() + /** - * Perform any cleanup of this service manager. If overridden, be sure to invoke + * Perform any cleanup of this service manager. * the super implementation. */ - def stop() {} + protected def onStop(): Unit = {} } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala index 4754e03bdcd19..257571b5a9d3e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/ExternalSuppliedUrisDriverServiceManager.scala @@ -30,36 +30,22 @@ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class ExternalUriSetWatcher(externalUriFuture: SettableFuture[String]) - extends Watcher[Service] with Logging { - - override def eventReceived(action: Action, service: Service): Unit = { - if (action == Action.MODIFIED && !externalUriFuture.isDone) { - service - .getMetadata - .getAnnotations - .asScala - .get(ANNOTATION_RESOLVED_EXTERNAL_URI) - .foreach(externalUriFuture.set) - } - } - - override def onClose(cause: KubernetesClientException): Unit = { - logDebug("External URI set watcher closed.", cause) - } -} - +/** + * Creates the service with an annotation that is expected to be detected by another process + * which the user provides and is not built in this project. When the external process detects + * the creation of the service with the appropriate annotation, it is expected to populate the + * value of a second annotation that is the URI of the driver submission server. + */ private[spark] class ExternalSuppliedUrisDriverServiceManager extends DriverServiceManager with Logging { private val externalUriFuture = SettableFuture.create[String] private var externalUriSetWatch: Option[Watch] = None - override def start( + override def onStart( kubernetesClient: KubernetesClient, serviceName: String, sparkConf: SparkConf): Unit = { - super.start(kubernetesClient, serviceName, sparkConf) externalUriSetWatch = Some(kubernetesClient .services() .withName(serviceName) @@ -68,26 +54,25 @@ private[spark] class ExternalSuppliedUrisDriverServiceManager override def getServiceManagerType: String = ExternalSuppliedUrisDriverServiceManager.TYPE - override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { - val timeoutSeconds = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) - require(externalUriSetWatch.isDefined, "The watch that listens for the provision of" + - " the external URI was not started; was start() called?") - Set(externalUriFuture.get(timeoutSeconds, TimeUnit.SECONDS)) - } - override def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder = { require(serviceName != null, "Service name was null; was start() called?") driverServiceTemplate .editMetadata() - .addToAnnotations(ANNOTATION_PROVIDE_EXTERNAL_URI, "true") - .endMetadata() + .addToAnnotations(ANNOTATION_PROVIDE_EXTERNAL_URI, "true") + .endMetadata() .editSpec() - .withType("ClusterIP") - .endSpec() + .withType("ClusterIP") + .endSpec() } - override def stop(): Unit = { - super.stop() + override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { + val timeoutSeconds = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + require(externalUriSetWatch.isDefined, "The watch that listens for the provision of" + + " the external URI was not started; was start() called?") + Set(externalUriFuture.get(timeoutSeconds, TimeUnit.SECONDS)) + } + + override def onStop(): Unit = { Utils.tryLogNonFatalError { externalUriSetWatch.foreach(_.close()) externalUriSetWatch = None @@ -99,3 +84,22 @@ private[spark] object ExternalSuppliedUrisDriverServiceManager { val TYPE = "ExternalAnnotation" } +private[spark] class ExternalUriSetWatcher(externalUriFuture: SettableFuture[String]) + extends Watcher[Service] with Logging { + + override def eventReceived(action: Action, service: Service): Unit = { + if (action == Action.MODIFIED && !externalUriFuture.isDone) { + service + .getMetadata + .getAnnotations + .asScala + .get(ANNOTATION_RESOLVED_EXTERNAL_URI) + .foreach(externalUriFuture.set) + } + } + + override def onClose(cause: KubernetesClientException): Unit = { + logDebug("External URI set watcher closed.", cause) + } +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala index 9c74867b17a72..fa8362677f38f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/NodePortUrisDriverServiceManager.scala @@ -23,8 +23,18 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ import org.apache.spark.internal.Logging +/** + * Creates the service with an open NodePort. The URI to reach the submission server is thus + * at the address of any of the nodes through the service's node port. + */ private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManager with Logging { + override def getServiceManagerType: String = NodePortUrisDriverServiceManager.TYPE + + override def customizeDriverService(driverServiceTemplate: ServiceBuilder): ServiceBuilder = { + driverServiceTemplate.editSpec().withType("NodePort").endSpec() + } + override def getDriverServiceSubmissionServerUris(driverService: Service): Set[String] = { val urlScheme = if (sparkConf.get(DRIVER_SUBMIT_SSL_ENABLED)) { "https" @@ -53,13 +63,6 @@ private[spark] class NodePortUrisDriverServiceManager extends DriverServiceManag require(nodeUrls.nonEmpty, "No nodes found to contact the driver!") nodeUrls } - - override def getServiceManagerType: String = NodePortUrisDriverServiceManager.TYPE - - override def customizeDriverService(driverServiceTemplate: ServiceBuilder) - : ServiceBuilder = { - driverServiceTemplate.editSpec().withType("NodePort").endSpec() - } } private[spark] object NodePortUrisDriverServiceManager { From 95c54a9733fde886fa31f241488f89386d4a45a2 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 28 Feb 2017 12:21:47 -0800 Subject: [PATCH 4/6] Fix doc style --- .../spark/deploy/rest/kubernetes/DriverServiceManager.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala index e1b24fd0f6362..d92c0247e2a35 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/DriverServiceManager.scala @@ -56,9 +56,9 @@ trait DriverServiceManager { } /** - * Guaranteed to be called before {@link createDriverService} or - * {@link getDriverServiceSubmissionServerUris} is called. - */ + * Guaranteed to be called before {@link createDriverService} or + * {@link getDriverServiceSubmissionServerUris} is called. + */ protected def onStart( kubernetesClient: KubernetesClient, serviceName: String, From 664d82f537aa1a5a365356d5ac0d9d2adfe31dc1 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 28 Feb 2017 15:28:09 -0800 Subject: [PATCH 5/6] Docs updates --- docs/running-on-kubernetes.md | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index ee7cf2c5a85ec..b3da2049db7bd 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -117,14 +117,15 @@ that is opened on every node. The submission client will then contact the driver addresses with the appropriate service port. There may be cases where the nodes cannot be reached by the submission client. For example, the cluster may -only be reachable through an external load balancer. The user may provide their own external IP for Spark driver -services. To use a your own external IP instead of a node's IP, first set +only be reachable through an external load balancer. The user may provide their own external URI for Spark driver +services. To use a your own external URI instead of a node's IP and node port, first set `spark.kubernetes.driver.serviceManagerType` to `ExternalAnnotation`. A service will be created with the annotation `spark-job.alpha.apache.org/provideExternalUri`, and this service routes to the driver pod. You will need to run a -process that watches the API server for services that are created with this annotation in the application's namespace -(set by `spark.kubernetes.namespace`). The process should determine a URI that routes to this service, and patch the -service to include an annotation `spark-job.alpha.apache.org/resolvedExternalUri`, which has its value as the external -URI that your process has provided (e.g. `https://example.com:8080/my-job`). +separate process that watches the API server for services that are created with this annotation in the application's +namespace (set by `spark.kubernetes.namespace`). The process should determine a URI that routes to this service +(potentially configuring infrastructure to handle the URI behind the scenes), and patch the service to include an +annotation `spark-job.alpha.apache.org/resolvedExternalUri`, which has its value as the external URI that your process +has provided (e.g. `https://example.com:8080/my-job`). Note that if the URI provided by the annotation also provides a base path, the base path should be removed when the request is forwarded to the back end pod. @@ -250,11 +251,11 @@ from the other deployment modes. See the [configuration page](configuration.html A tag indicating which class to use for creating the Kubernetes service and determining its URI for the submission client. Valid values are currently NodePort and ExternalAnnotation. By default, a service - is created with the NodePort type, and the driver will be contacted at one of the kubelet nodes at the port that the - Kubelets expose for the service. If the Kubelets cannot be contacted from the submitter's machine, consider setting - this to ExternalAnnotation as described in "Determining the Driver Base URI" above. One may also - include a custom implementation of org.apache.spark.deploy.rest.kubernetes.DriverServiceManager on the - submitter's classpath - spark-submit loads an instance of that class via Service Loading. To use the custom + is created with the NodePort type, and the driver will be contacted at one of the nodes at the port + that the nodes expose for the service. If the nodes cannot be contacted from the submitter's machine, consider + setting this to ExternalAnnotation as described in "Determining the Driver Base URI" above. One may + also include a custom implementation of org.apache.spark.deploy.rest.kubernetes.DriverServiceManager on + the submitter's classpath - spark-submit service loads an instance of that class. To use the custom implementation, set this value to the custom implementation's return value of DriverServiceManager#getServiceManagerType(). This method should only be done as a last resort. From 343ab54415abf19612aeb345f4ffeedfb249a715 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 28 Feb 2017 15:42:16 -0800 Subject: [PATCH 6/6] Clearly explain path rewrites --- docs/running-on-kubernetes.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b3da2049db7bd..9a0714e4596a4 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -127,8 +127,10 @@ namespace (set by `spark.kubernetes.namespace`). The process should determine a annotation `spark-job.alpha.apache.org/resolvedExternalUri`, which has its value as the external URI that your process has provided (e.g. `https://example.com:8080/my-job`). -Note that if the URI provided by the annotation also provides a base path, the base path should be removed when the -request is forwarded to the back end pod. +Note that the URI provided in the annotation needs to route traffic to the appropriate destination on the pod, which has +a empty path portion of the URI. This means the external URI provider will likely need to rewrite the path from the +external URI to the destination on the pod, e.g. https://example.com:8080/spark-app-1/submit will need to route traffic +to https://:/. Note that the paths of these two URLs are different. If the above is confusing, keep in mind that this functionality is only necessary if the submitter cannot reach any of the nodes at the driver's node port. It is recommended to use the default configuration with the node port service