diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 54f39f7620e5..a5ad04518de3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -50,7 +50,8 @@ class HistoryServer(
conf: SparkConf,
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
- port: Int)
+ port: Int,
+ basePath: String = "")
extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
with Logging with UIRoot with ApplicationCacheOperations {
@@ -279,6 +280,7 @@ object HistoryServer extends Logging {
.asInstanceOf[ApplicationHistoryProvider]
val port = conf.getInt("spark.history.ui.port", 18080)
+ val basePath = conf.get("spark.history.ui.basePath", "")
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
index 17bc04303fa8..2fd99c2e6545 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala
@@ -203,9 +203,9 @@ private[v1] class ApiRootResource extends UIRootFromServletContext {
private[spark] object ApiRootResource {
- def getServletHandler(uiRoot: UIRoot): ServletContextHandler = {
+ def getServletHandler(uiRoot: UIRoot, basePath: String = ""): ServletContextHandler = {
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
- jerseyContext.setContextPath("/api")
+ jerseyContext.setContextPath(s"${basePath.stripSuffix("/")}/api")
val holder: ServletHolder = new ServletHolder(classOf[ServletContainer])
holder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.spark.status.api.v1")
UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 7d31ac54a717..61ecdfdb3d58 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -50,6 +50,7 @@ private[spark] class SparkUI private (
val operationGraphListener: RDDOperationGraphListener,
var appName: String,
val basePath: String,
+ val apiRootResourceBasePath: String,
val startTime: Long)
extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
conf, basePath, "SparkUI")
@@ -73,7 +74,7 @@ private[spark] class SparkUI private (
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
- attachHandler(ApiRootResource.getServletHandler(this))
+ attachHandler(ApiRootResource.getServletHandler(this, apiRootResourceBasePath))
// These should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST")))
@@ -160,8 +161,10 @@ private[spark] object SparkUI {
securityManager: SecurityManager,
appName: String,
startTime: Long): SparkUI = {
+ val basePath = conf.get("spark.ui.basePath", "")
create(Some(sc), conf, listenerBus, securityManager, appName,
- jobProgressListener = Some(jobProgressListener), startTime = startTime)
+ jobProgressListener = Some(jobProgressListener), startTime = startTime,
+ basePath = basePath, apiRootResourceBasePath = basePath)
}
def createHistoryUI(
@@ -197,6 +200,7 @@ private[spark] object SparkUI {
securityManager: SecurityManager,
appName: String,
basePath: String = "",
+ apiRootResourceBasePath: String = "",
jobProgressListener: Option[JobProgressListener] = None,
startTime: Long): SparkUI = {
@@ -220,6 +224,6 @@ private[spark] object SparkUI {
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
- appName, basePath, startTime)
+ appName, basePath, apiRootResourceBasePath, startTime)
}
}
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index e5c7e9bb6944..848b52a332ee 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -187,6 +187,22 @@ from the other deployment modes. See the [configuration page](configuration.html
for bookkeeping purposes.
+
+ spark.kubernetes.driver.exposeIngress |
+ false |
+
+ When initially contacting the driver, use an Ingress when the submitting client passes application dependencies to
+ the driver pod. The Ingress also remains and exposes the Spark UI. This requires an Ingress controller to be
+ installed on the cluster.
+ |
+
+
+ spark.kubernetes.driver.ingressBasePath |
+ (none) |
+
+ Base path for the ingress created for the driver. Must be provided if spark.kubernetes.driver.exposeIngress is true.
+ |
+
spark.kubernetes.driverSubmitTimeout |
60s |
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index aa273a024f6f..2032c0fb057f 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -26,6 +26,7 @@ import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.extensions.Ingress
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
@@ -64,6 +65,9 @@ private[spark] class Client(
private val sparkFiles = sparkConf.getOption("spark.files")
private val sparkJars = sparkConf.getOption("spark.jars")
+ private val useIngress = sparkConf.get(DRIVER_EXPOSE_INGRESS)
+ private val ingressBasePath = sparkConf.get(INGRESS_BASE_PATH)
+
private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)
private val secretBase64String = {
@@ -77,6 +81,11 @@ private[spark] class Client(
def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
+ if (useIngress) {
+ require(ingressBasePath.isDefined, "Ingress base path must be provided if" +
+ s" ${DRIVER_EXPOSE_INGRESS.key} is true")
+ }
+
val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
val submitterLocalJars = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars)
(submitterLocalFiles ++ submitterLocalJars).foreach { file =>
@@ -129,7 +138,7 @@ private[spark] class Client(
.pods()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>
- val (driverPod, driverService) = launchDriverKubernetesComponents(
+ val (driverPod, driverService, driverIngress) = launchDriverKubernetesComponents(
kubernetesClient,
parsedCustomLabels,
submitServerSecret,
@@ -139,16 +148,19 @@ private[spark] class Client(
sslVolumeMounts,
sslEnvs,
isKeyStoreLocalFile)
- val ownerReferenceConfiguredDriverService = try {
+ val (ownerReferenceConfiguredDriverService,
+ ownerReferenceConfiguredDriverIngress) = try {
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslSecrets,
driverPod,
- driverService)
+ driverService,
+ driverIngress)
} catch {
case e: Throwable =>
- cleanupPodAndService(kubernetesClient, driverPod, driverService)
+ cleanupAllDriverComponents(kubernetesClient, driverPod, driverService,
+ driverIngress)
throw new SparkException("Failed to set owner references to the driver pod.", e)
}
try {
@@ -164,8 +176,9 @@ private[spark] class Client(
}
} catch {
case e: Throwable =>
- cleanupPodAndService(kubernetesClient, driverPod,
- ownerReferenceConfiguredDriverService)
+ cleanupAllDriverComponents(kubernetesClient, driverPod,
+ ownerReferenceConfiguredDriverService,
+ ownerReferenceConfiguredDriverIngress)
throw new SparkException("Failed to submit the application to the driver pod.", e)
}
}
@@ -186,16 +199,20 @@ private[spark] class Client(
}
}
- private def cleanupPodAndService(
+ private def cleanupAllDriverComponents(
kubernetesClient: KubernetesClient,
driverPod: Pod,
- driverService: Service): Unit = {
+ driverService: Service,
+ driverIngress: Option[Ingress]): Unit = {
Utils.tryLogNonFatalError {
kubernetesClient.services().delete(driverService)
}
Utils.tryLogNonFatalError {
kubernetesClient.pods().delete(driverPod)
}
+ Utils.tryLogNonFatalError {
+ driverIngress.foreach(kubernetesClient.extensions().ingresses().delete(_))
+ }
}
private def submitApplicationToDriverServer(
@@ -215,8 +232,10 @@ private[spark] class Client(
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
+ sparkConf.set("spark.ui.basePath", s"/$kubernetesAppId/$UI_PATH_COMPONENT")
val driverSubmitter = buildDriverSubmissionClient(kubernetesClient, driverService,
driverSubmitSslOptions)
+
// Sanity check to see if the driver submitter is even reachable.
driverSubmitter.ping()
logInfo(s"Submitting local resources to driver pod for application " +
@@ -248,18 +267,22 @@ private[spark] class Client(
sslVolumes: Array[Volume],
sslVolumeMounts: Array[VolumeMount],
sslEnvs: Array[EnvVar],
- isKeyStoreLocalFile: Boolean): (Pod, Service) = {
- val endpointsReadyFuture = SettableFuture.create[Endpoints]
- val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
- val serviceReadyFuture = SettableFuture.create[Service]
+ isKeyStoreLocalFile: Boolean): (Pod, Service, Option[Ingress]) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
- val serviceReadyWatcher = new DriverServiceReadyWatcher(serviceReadyFuture)
+
val podReadyFuture = SettableFuture.create[Pod]
+ val endpointsReadyFuture = SettableFuture.create[Endpoints]
+ val serviceReadyFuture = SettableFuture.create[Service]
+ val ingressReadyFuture = ingressBasePath.map(_ => SettableFuture.create[Ingress])
+
val podWatcher = new DriverPodReadyWatcher(podReadyFuture)
+ val endpointsReadyWatcher = new DriverEndpointsReadyWatcher(endpointsReadyFuture)
+ val serviceReadyWatcher = new DriverServiceReadyWatcher(serviceReadyFuture)
+ val ingressWatcher = new DriverIngressReadyWatcher(ingressReadyFuture)
Utils.tryWithResource(kubernetesClient
.pods()
.withName(kubernetesAppId)
@@ -272,40 +295,53 @@ private[spark] class Client(
.endpoints()
.withName(kubernetesAppId)
.watch(endpointsReadyWatcher)) { _ =>
- val driverService = createDriverService(
- kubernetesClient,
- driverKubernetesSelectors,
- submitServerSecret)
- val driverPod = try {
- createDriverPod(
+ Utils.tryWithResource(kubernetesClient
+ .extensions()
+ .ingresses()
+ .watch(ingressWatcher)) { _ =>
+ val driverService = createDriverService(
kubernetesClient,
driverKubernetesSelectors,
- submitServerSecret,
- driverSubmitSslOptions,
- sslVolumes,
- sslVolumeMounts,
- sslEnvs)
- } catch {
- case e: Throwable =>
- Utils.tryLogNonFatalError {
- kubernetesClient.services().delete(driverService)
- }
- throw new SparkException("Failed to create the driver pod.", e)
- }
- try {
- waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
- serviceReadyFuture, podReadyFuture)
- (driverPod, driverService)
- } catch {
- case e: Throwable =>
- Utils.tryLogNonFatalError {
- kubernetesClient.services().delete(driverService)
- }
- Utils.tryLogNonFatalError {
- kubernetesClient.pods().delete(driverPod)
- }
- throw new SparkException("Timed out while waiting for a Kubernetes component to be" +
- " ready.", e)
+ submitServerSecret)
+ val driverPod = try {
+ createDriverPod(
+ kubernetesClient,
+ driverKubernetesSelectors,
+ submitServerSecret,
+ driverSubmitSslOptions,
+ sslVolumes,
+ sslVolumeMounts,
+ sslEnvs)
+ } catch {
+ case e: Throwable =>
+ Utils.tryLogNonFatalError {
+ kubernetesClient.services().delete(driverService)
+ }
+ throw new SparkException("Failed to create the driver pod.", e)
+ }
+ val driverIngress = try {
+ createDriverIngress(kubernetesClient, driverKubernetesSelectors, driverService)
+ } catch {
+ case e: Throwable =>
+ Utils.tryLogNonFatalError {
+ kubernetesClient.services().delete(driverService)
+ }
+ Utils.tryLogNonFatalError {
+ kubernetesClient.pods().delete(driverPod)
+ }
+ throw new SparkException("Failed to create the driver ingress.", e)
+ }
+ try {
+ waitForReadyKubernetesComponents(kubernetesClient, endpointsReadyFuture,
+ serviceReadyFuture, podReadyFuture, ingressReadyFuture)
+ (driverPod, driverService, driverIngress)
+ } catch {
+ case e: Throwable =>
+ cleanupAllDriverComponents(kubernetesClient, driverPod, driverService,
+ driverIngress)
+ throw new SparkException("Timed out while waiting for a Kubernetes component to" +
+ " be ready.", e)
+ }
}
}
}
@@ -323,7 +359,8 @@ private[spark] class Client(
submitServerSecret: Secret,
sslSecrets: Array[Secret],
driverPod: Pod,
- driverService: Service): Service = {
+ driverService: Service,
+ driverIngress: Option[Ingress]): (Service, Option[Ingress]) = {
val driverPodOwnerRef = new OwnerReferenceBuilder()
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
@@ -343,18 +380,30 @@ private[spark] class Client(
.addToOwnerReferences(driverPodOwnerRef)
.endMetadata()
.done()
- kubernetesClient.services().withName(driverService.getMetadata.getName).edit()
- .editMetadata()
- .addToOwnerReferences(driverPodOwnerRef)
- .endMetadata()
- .done()
+ val updatedService = kubernetesClient
+ .services()
+ .withName(driverService.getMetadata.getName)
+ .edit()
+ .editMetadata()
+ .addToOwnerReferences(driverPodOwnerRef)
+ .endMetadata()
+ .done()
+ val updatedIngress = driverIngress.map { ingress =>
+ kubernetesClient.extensions().ingresses().withName(ingress.getMetadata.getName).edit()
+ .editMetadata()
+ .addToOwnerReferences(driverPodOwnerRef)
+ .endMetadata()
+ .done()
+ }
+ (updatedService, updatedIngress)
}
private def waitForReadyKubernetesComponents(
kubernetesClient: KubernetesClient,
endpointsReadyFuture: SettableFuture[Endpoints],
serviceReadyFuture: SettableFuture[Service],
- podReadyFuture: SettableFuture[Pod]) = {
+ podReadyFuture: SettableFuture[Pod],
+ ingressReadyFuture: Option[SettableFuture[Ingress]]) = {
try {
podReadyFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
logInfo("Driver pod successfully created in Kubernetes cluster.")
@@ -380,6 +429,17 @@ private[spark] class Client(
throw new SparkException(s"The driver service endpoint was not ready" +
s" in $driverSubmitTimeoutSecs seconds.", e)
}
+ ingressReadyFuture.foreach { future =>
+ try {
+ future.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
+ logInfo("Driver ingress ready to proxy application submission request" +
+ " to driver service")
+ } catch {
+ case e: Throwable =>
+ throw new SparkException(s"The driver ingress was not ready" +
+ s" in $driverSubmitTimeoutSecs seconds.", e)
+ }
+ }
}
private def createDriverService(
@@ -397,7 +457,7 @@ private[spark] class Client(
.withLabels(driverKubernetesSelectors)
.endMetadata()
.withNewSpec()
- .withType("NodePort")
+ .withType(if (useIngress) "ClusterIP" else "NodePort")
.withSelector(driverKubernetesSelectors)
.withPorts(driverSubmissionServicePort)
.endSpec()
@@ -415,7 +475,7 @@ private[spark] class Client(
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (driverSubmitSslOptions.enabled) "HTTPS" else "HTTP")
- .withPath("/v1/submissions/ping")
+ .withPath(s"/$kubernetesAppId/$SUBMISSION_SERVER_PATH_COMPONENT/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
kubernetesClient.pods().createNew()
@@ -451,6 +511,10 @@ private[spark] class Client(
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
+ .addNewEnv()
+ .withName(ENV_SUBMISSION_SERVER_BASE_PATH)
+ .withValue(s"/$kubernetesAppId/$SUBMISSION_SERVER_PATH_COMPONENT")
+ .endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.withNewReadinessProbe().withHttpGet(probePingHttpGet).endReadinessProbe()
@@ -459,6 +523,40 @@ private[spark] class Client(
.done()
}
+ private def createDriverIngress(
+ kubernetesClient: KubernetesClient,
+ driverKubernetesSelectors: util.Map[String, String],
+ driverService: Service): Option[Ingress] = {
+ ingressBasePath.map(_ => {
+ kubernetesClient.extensions().ingresses().createNew()
+ .withNewMetadata()
+ .withName(kubernetesAppId)
+ .withLabels(driverKubernetesSelectors)
+ .endMetadata()
+ .withNewSpec()
+ .addNewRule()
+ .withNewHttp()
+ .addNewPath()
+ .withPath(s"/$kubernetesAppId/$SUBMISSION_SERVER_PATH_COMPONENT")
+ .withNewBackend()
+ .withServiceName(driverService.getMetadata.getName)
+ .withNewServicePort(SUBMISSION_SERVER_PORT_NAME)
+ .endBackend()
+ .endPath()
+ .addNewPath()
+ .withPath(s"/$kubernetesAppId/$UI_PATH_COMPONENT")
+ .withNewBackend()
+ .withServiceName(driverService.getMetadata.getName)
+ .withNewServicePort(UI_PORT_NAME)
+ .endBackend()
+ .endPath()
+ .endHttp()
+ .endRule()
+ .endSpec()
+ .done()
+ })
+ }
+
private class DriverPodReadyWatcher(resolvedDriverPod: SettableFuture[Pod]) extends Watcher[Pod] {
override def eventReceived(action: Action, pod: Pod): Unit = {
if ((action == Action.ADDED || action == Action.MODIFIED)
@@ -508,6 +606,22 @@ private[spark] class Client(
}
}
+ private class DriverIngressReadyWatcher(resolvedDriverIngress: Option[SettableFuture[Ingress]])
+ extends Watcher[Ingress] {
+
+ override def eventReceived(action: Action, ingress: Ingress): Unit = {
+ if ((action == Action.ADDED || action == Action.MODIFIED)
+ && ingress.getStatus.getLoadBalancer.getIngress.asScala.nonEmpty) {
+ resolvedDriverIngress.foreach(_.set(ingress))
+ }
+ }
+
+ override def onClose(cause: KubernetesClientException): Unit = {
+ logDebug("Ingress readiness watch closed.", cause)
+ }
+
+ }
+
private def parseDriverSubmitSslOptions(): (SSLOptions, Boolean) = {
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
val resolvedSparkConf = sparkConf.clone()
@@ -719,19 +833,24 @@ private[spark] class Client(
val servicePort = service.getSpec.getPorts.asScala
.filter(_.getName == SUBMISSION_SERVER_PORT_NAME)
.head.getNodePort
- val nodeUrls = kubernetesClient.nodes.list.getItems.asScala
- .filterNot(node => node.getSpec.getUnschedulable != null &&
- node.getSpec.getUnschedulable)
- .flatMap(_.getStatus.getAddresses.asScala)
- // The list contains hostnames, internal and external IP addresses.
- // (https://kubernetes.io/docs/admin/node/#addresses)
- // we want only external IP addresses and legacyHostIP addresses in our list
- // legacyHostIPs are deprecated and will be removed in the future.
- // (https://github.com/kubernetes/kubernetes/issues/9267)
- .filter(address => address.getType == "ExternalIP" || address.getType == "LegacyHostIP")
- .map(address => {
- s"$urlScheme://${address.getAddress}:$servicePort"
- }).toSet
+ val serverBasePath = s"$kubernetesAppId/$SUBMISSION_SERVER_PATH_COMPONENT"
+ val nodeUrls = ingressBasePath.map(ingressBase => {
+ Set(s"$urlScheme://$ingressBase/$serverBasePath")
+ }).getOrElse {
+ kubernetesClient.nodes.list.getItems.asScala
+ .filterNot(node => node.getSpec.getUnschedulable != null &&
+ node.getSpec.getUnschedulable)
+ .flatMap(_.getStatus.getAddresses.asScala)
+ // The list contains hostnames, internal and external IP addresses.
+ // (https://kubernetes.io/docs/admin/node/#addresses)
+ // we want only external IP addresses and legacyHostIP addresses in our list
+ // legacyHostIPs are deprecated and will be removed in the future.
+ // (https://github.com/kubernetes/kubernetes/issues/9267)
+ .filter(address => address.getType == "ExternalIP" || address.getType == "LegacyHostIP")
+ .map(address => {
+ s"$urlScheme://${address.getAddress}:$servicePort/$serverBasePath"
+ }).toSet
+ }
require(nodeUrls.nonEmpty, "No nodes found to contact the driver!")
val (trustManager, sslContext): (X509TrustManager, SSLContext) =
if (driverSubmitSslOptions.enabled) {
@@ -739,9 +858,14 @@ private[spark] class Client(
} else {
(null, SSLContext.getDefault)
}
+ val maxRetriesPerServer = if (useIngress) {
+ SUBMISSION_CLIENT_RETRIES_INGRESS
+ } else {
+ SUBMISSION_CLIENT_RETRIES_NODE_PORT
+ }
HttpClientUtil.createClient[KubernetesSparkRestApi](
uris = nodeUrls,
- maxRetriesPerServer = 3,
+ maxRetriesPerServer = maxRetriesPerServer,
sslSocketFactory = sslContext.getSocketFactory,
trustContext = trustManager,
connectTimeoutMillis = 5000)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index ad83b0446538..c5e9e4c825f1 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -183,4 +183,28 @@ package object config {
""".stripMargin)
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
+
+ private[spark] val DRIVER_EXPOSE_INGRESS =
+ ConfigBuilder("spark.kubernetes.driver.exposeIngress")
+ .doc(
+ """
+ | When initially contacting the driver, use an Ingress when the submitting client
+ | passes application dependencies to the driver pod. The Ingress also remains and
+ | exposes the Spark UI. This requires an Ingress controller must be installed on
+ | the cluster.
+ """.stripMargin
+ )
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val INGRESS_BASE_PATH =
+ ConfigBuilder("spark.kubernetes.driver.ingressBasePath")
+ .doc(
+ s"""
+ | Base path for the ingress created for the driver. Must be provided if
+ | ${DRIVER_EXPOSE_INGRESS.key} is true.
+ """.stripMargin
+ )
+ .stringConf
+ .createOptional
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
index 688cd858e79f..53890b89f063 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
@@ -50,6 +50,7 @@ package object constants {
// Environment Variables
private[spark] val ENV_SUBMISSION_SECRET_LOCATION = "SPARK_SUBMISSION_SECRET_LOCATION"
private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
+ private[spark] val ENV_SUBMISSION_SERVER_BASE_PATH = "SPARK_SUBMISSION_SERVER_BASE_PATH"
private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
@@ -64,8 +65,14 @@ package object constants {
private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
+ // Ingress paths
+ private[spark] val SUBMISSION_SERVER_PATH_COMPONENT = "submit-driver"
+ private[spark] val UI_PATH_COMPONENT = "spark-ui"
+
// Miscellaneous
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val KUBERNETES_SUBMIT_SSL_NAMESPACE = "kubernetes.submit"
+ private[spark] val SUBMISSION_CLIENT_RETRIES_NODE_PORT = 3
+ private[spark] val SUBMISSION_CLIENT_RETRIES_INGRESS = 20
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala
index 576f7058f20e..8b7786a21c4c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/HttpClientUtil.scala
@@ -20,8 +20,9 @@ import javax.net.ssl.{SSLContext, SSLSocketFactory, X509TrustManager}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import feign.{Client, Feign, Request, Response}
+import feign.{Client, Feign, Request, Response, RetryableException}
import feign.Request.Options
+import feign.codec.ErrorDecoder
import feign.jackson.{JacksonDecoder, JacksonEncoder}
import feign.jaxrs.JAXRSContract
import okhttp3.OkHttpClient
@@ -57,11 +58,23 @@ private[spark] object HttpClientUtil {
response
}
}
+ val defaultErrorDecoder = new ErrorDecoder.Default
+ val alwaysRetryErrorDecoder = new ErrorDecoder {
+ override def decode(methodKey: String, response: Response): Exception = {
+ defaultErrorDecoder.decode(methodKey, response) match {
+ case retryable: RetryableException => retryable
+ case e: Exception =>
+ new RetryableException("An error that is normally not retryable has" +
+ " occurred, but the client may retry anyways...", e, null)
+ }
+ }
+ }
Feign.builder()
.client(resetTargetHttpClient)
.contract(new JAXRSContract)
.encoder(new JacksonEncoder(objectMapper))
.decoder(new JacksonDecoder(objectMapper))
+ .errorDecoder(alwaysRetryErrorDecoder)
.options(new Options(connectTimeoutMillis, readTimeoutMillis))
.retryer(target)
.target(target)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
index f0b01b232098..846fd3f01798 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
private case class KubernetesSparkRestServerArguments(
host: Option[String] = None,
port: Option[Int] = None,
+ basePath: Option[String] = None,
useSsl: Boolean = false,
secretFile: Option[String] = None,
keyStoreFile: Option[String] = None,
@@ -45,6 +46,7 @@ private case class KubernetesSparkRestServerArguments(
def validate(): KubernetesSparkRestServerArguments = {
require(host.isDefined, "Hostname not set via --hostname.")
require(port.isDefined, "Port not set via --port")
+ require(basePath.isDefined, "Base path not defined via --base-path")
require(secretFile.isDefined, "Secret file not set via --secret-file")
this
}
@@ -65,6 +67,9 @@ private object KubernetesSparkRestServerArguments {
case "--secret-file" :: value :: tail =>
args = tail
resolvedArguments.copy(secretFile = Some(value))
+ case "--base-path" :: value :: tail =>
+ args = tail
+ resolvedArguments.copy(basePath = Some(value))
case "--use-ssl" :: value :: tail =>
args = tail
resolvedArguments.copy(useSsl = value.toBoolean)
@@ -98,6 +103,7 @@ private object KubernetesSparkRestServerArguments {
private[spark] class KubernetesSparkRestServer(
host: String,
port: Int,
+ basePath: String,
conf: SparkConf,
expectedApplicationSecret: Array[Byte],
shutdownLock: CountDownLatch,
@@ -108,9 +114,12 @@ private[spark] class KubernetesSparkRestServer(
private val javaExecutable = s"${System.getenv("JAVA_HOME")}/bin/java"
private val sparkHome = System.getenv("SPARK_HOME")
private val securityManager = new SecurityManager(conf)
+
+ // super.baseContext has a leading slash
+ private val completeBaseContext = basePath + baseContext
override protected lazy val contextToServlet = Map[String, RestServlet](
- s"$baseContext/create/*" -> submitRequestServlet,
- s"$baseContext/ping/*" -> pingServlet)
+ s"$completeBaseContext/create/*" -> submitRequestServlet,
+ s"$completeBaseContext/ping/*" -> pingServlet)
private val pingServlet = new PingServlet
override protected val submitRequestServlet: SubmitRequestServlet
@@ -358,6 +367,7 @@ private[spark] object KubernetesSparkRestServer {
val server = new KubernetesSparkRestServer(
parsedArguments.host.get,
parsedArguments.port.get,
+ parsedArguments.basePath.get,
sparkConf,
secretBytes,
barrier,
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
index 92fdfb8ac5f4..82f4a32c8a11 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver/Dockerfile
@@ -29,4 +29,5 @@ CMD SSL_ARGS="" && \
--hostname $HOSTNAME \
--port $SPARK_SUBMISSION_SERVER_PORT \
--secret-file $SPARK_SUBMISSION_SECRET_LOCATION \
+ --base-path $SPARK_SUBMISSION_SERVER_BASE_PATH \
${SSL_ARGS}
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index 4c8069db2686..03c713b6bc06 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -247,7 +247,7 @@
wget
- https://storage.googleapis.com/minikube/releases/v0.12.2/minikube-linux-amd64
+ https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-linux-amd64
${project.build.directory}/minikube-bin/linux-amd64
minikube
@@ -259,7 +259,7 @@
wget
- https://storage.googleapis.com/minikube/releases/v0.12.2/minikube-darwin-amd64
+ https://storage.googleapis.com/minikube/releases/v0.16.0/minikube-darwin-amd64
${project.build.directory}/minikube-bin/darwin-amd64
minikube
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index fe171db15b3d..4100a9f3f709 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -35,10 +35,12 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.SparkSubmit
import org.apache.spark.deploy.kubernetes.Client
+import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.integrationtest.sslutil.SSLUtils
+import org.apache.spark.deploy.rest.kubernetes.HttpClientUtil
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils
@@ -83,6 +85,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
"changeit")
keyStoreFile = keyStore
trustStoreFile = trustStore
+ minikubeKubernetesClient.services().inNamespace("kube-system").createOrReplaceWithNew()
+ .withNewMetadata()
+ .withName("spark-integration-test-ingress-service")
+ .addToLabels("spark-integration-test-ingress-service", System.currentTimeMillis().toString)
+ .endMetadata()
+ .withNewSpec()
+ .withType("NodePort")
+ .addToSelector("app", "nginx-ingress-lb")
+ .addNewPort()
+ .withName("nginx-port")
+ .withPort(80)
+ .withNewTargetPort(80)
+ .withNodePort(31000) // Since we control the environment and shouldn't hit port conflicts
+ .endPort()
+ .endSpec()
+ .done()
}
before {
@@ -108,6 +126,9 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.withGracePeriod(60)
.delete
})
+ // spark-submit sets the system properties, so just reset all of them
+ val sparkConfFromSystemProperties = new SparkConf(true)
+ sparkConfFromSystemProperties.getAll.foreach(conf => System.clearProperty(conf._1))
}
override def afterAll(): Unit = {
@@ -124,7 +145,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.get(0)
.getMetadata
.getName
- Minikube.getService[SparkRestApiV1](serviceName, NAMESPACE, "spark-ui-port")
+ Minikube.getService[SparkRestApiV1](
+ serviceName,
+ NAMESPACE,
+ "spark-ui-port",
+ s"/$serviceName/$UI_PATH_COMPONENT")
}
private def expectationsForStaticAllocation(sparkMetricsService: SparkRestApiV1): Unit = {
@@ -366,4 +391,41 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
s" with correct contents."), "Job did not find the file as expected.")
}
}
+
+ test("Use ingress to submit the driver") {
+ val args = Array(
+ "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443",
+ "--deploy-mode", "cluster",
+ "--kubernetes-namespace", NAMESPACE,
+ "--name", "spark-pi",
+ "--executor-memory", "512m",
+ "--executor-cores", "1",
+ "--num-executors", "1",
+ "--jars", HELPER_JAR_FILE.getAbsolutePath,
+ "--class", SPARK_PI_MAIN_CLASS,
+ "--conf", "spark.ui.enabled=true",
+ "--conf", "spark.testing=false",
+ "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}",
+ "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}",
+ "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",
+ "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest",
+ "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest",
+ "--conf", "spark.kubernetes.submit.waitAppCompletion=false",
+ "--conf", "spark.kubernetes.driver.exposeIngress=true",
+ "--conf", s"spark.kubernetes.driver.ingressBasePath=${Minikube.getMinikubeIp}:31000",
+ EXAMPLES_JAR_FILE.getAbsolutePath)
+ SparkSubmit.main(args)
+ val driverPod = minikubeKubernetesClient
+ .pods
+ .withLabel("spark-app-name", "spark-pi")
+ .list()
+ .getItems
+ .get(0)
+ // This time we build the metrics service using the nginx proxy
+ val metricsUrl = s"http://${Minikube.getMinikubeIp}:31000/" +
+ s"${driverPod.getMetadata.getName}/" +
+ s"$UI_PATH_COMPONENT"
+ val sparkMetricsService = HttpClientUtil.createClient[SparkRestApiV1](Set(metricsUrl))
+ expectationsForStaticAllocation(sparkMetricsService)
+ }
}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala
index 07274bf962dd..e6e999d262b5 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala
@@ -51,6 +51,7 @@ private[spark] object Minikube extends Logging {
assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE)
if (getMinikubeStatus != MinikubeStatus.RUNNING) {
executeMinikube("start", "--memory", "6000", "--cpus", "8")
+ executeMinikube("addons", "enable", "ingress")
} else {
logInfo("Minikube is already started.")
}