Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2fc4efc
Clean up resources that are not used by pods.
mccheah May 25, 2017
c41a7fa
Make client side send correct credentials.
mccheah May 25, 2017
ef70ac2
Simplify cleanup logic.
mccheah May 25, 2017
2ce7968
Remove class
mccheah May 25, 2017
33b5938
Fix imports and line length.
mccheah May 25, 2017
0ea81c1
Remove import.
mccheah May 25, 2017
8fbdb24
Add a unit test for StagingResourcesStore.
mccheah May 26, 2017
161da02
Merge branch 'branch-2.1-kubernetes' into resource-staging-server-cle…
ash211 May 26, 2017
831b94f
Revamp cleanup process.
mccheah May 27, 2017
a360760
Merge branch 'resource-staging-server-cleanup' of github.com:apache-s…
mccheah May 27, 2017
d887081
Clarify log messages
mccheah May 30, 2017
b06ad41
Merge branch 'branch-2.1-kubernetes' into resource-staging-server-cle…
ash211 May 30, 2017
ce8a19e
Use a single set of credentials in resource staging server.
mccheah Jun 2, 2017
aeca760
Merge branch 'resource-staging-server-cleanup' of github.com:apache-s…
mccheah Jun 2, 2017
2f48823
Fix unit test.
mccheah Jun 2, 2017
1187b2e
Merge remote-tracking branch 'apache-spark-on-k8s/branch-2.1-kubernet…
mccheah Jun 2, 2017
e19a36f
Safe close if creating shuffle block handler fails
mccheah Jun 2, 2017
950bf92
Merge branch 'branch-2.1-kubernetes' into resource-staging-server-cle…
ash211 Jun 2, 2017
9a05f64
Use implicit class.
mccheah Jun 2, 2017
b79ae4c
Address comments.
mccheah Jun 3, 2017
3aad99a
Merge branch 'resource-staging-server-cleanup' of github.com:apache-s…
mccheah Jun 3, 2017
05586cf
Fix broken test.
mccheah Jun 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,69 @@ from the other deployment modes. See the [configuration page](configuration.html
client cert file, and/or OAuth token.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.resourceStagingServer.caCertFile</code></td>
<td>(none)</td>
<td>
Path to the CA cert file for connecting to the Kubernetes API server over TLS from the resource staging server when
it monitors objects in determining when to clean up resource bundles.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.resourceStagingServer.clientKeyFile</code></td>
<td>(none)</td>
<td>
Path to the client key file for authenticating against the Kubernetes API server from the resource staging server
when it monitors objects in determining when to clean up resource bundles. The resource staging server must have
credentials that allow it to view API objects in any namespace.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.resourceStagingServer.clientCertFile</code></td>
<td>(none)</td>
<td>
Path to the client cert file for authenticating against the Kubernetes API server from the resource staging server
when it monitors objects in determining when to clean up resource bundles. The resource staging server must have
credentials that allow it to view API objects in any namespace.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.resourceStagingServer.oauthToken</code></td>
<td>(none)</td>
<td>
OAuth token value for authenticating against the Kubernetes API server from the resource staging server
when it monitors objects in determining when to clean up resource bundles. The resource staging server must have
credentials that allow it to view API objects in any namespace. Note that this cannot be set at the same time as
<code>spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile</code></td>
<td>(none)</td>
<td>
File containing the OAuth token to use when authenticating against the against the Kubernetes API server from the
resource staging server, when it monitors objects in determining when to clean up resource bundles. The resource
staging server must have credentials that allow it to view API objects in any namespace. Note that this cannot be
set at the same time as <code>spark.kubernetes.authenticate.resourceStagingServer.oauthToken</code>.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.resourceStagingServer.useServiceAccountCredentials</code></td>
<td>true</td>
<td>
Whether or not to use a service account token and a service account CA certificate when the resource staging server
authenticates to Kubernetes. If this is set, interactions with Kubernetes will authenticate using a token located at
<code>/var/run/secrets/kubernetes.io/serviceaccount/token</code> and the CA certificate located at
<code>/var/run/secrets/kubernetes.io/serviceaccount/ca.crt</code>. Note that if
<code>spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile</code> is set, it takes precedence
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two options:

spark.kubernetes.authenticate.resourceStagingServer.oauthTokenFile
and
spark.kubernetes.authenticate.resourceStagingServer.caCertFile are already mentioned above in the table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but here it's worth mentioning that those configurations take precedence over the service account credentials.

over the usage of the service account token file. Also, if
<code>spark.kubernetes.authenticate.resourceStagingServer.caCertFile</code> is set, it takes precedence over using
the service account's CA certificate file. This generally should be set to true (the default value) when the
resource staging server is deployed as a Kubernetes pod, but should be set to false if the resource staging server
is deployed by other means (i.e. when running the staging server process outside of Kubernetes). The resource
staging server must have credentials that allow it to view API objects in any namespace.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,42 @@

package org.apache.spark.deploy.kubernetes

import java.io.File
import java.nio.ByteBuffer

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher}
import io.fabric8.kubernetes.client.{Config, KubernetesClient, KubernetesClientException, Watch, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.io.IOUtils
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver}
import org.apache.spark.network.util.TransportConf
import org.apache.spark.scheduler.cluster.kubernetes.DriverPodKubernetesClientProvider

/**
* An RPC endpoint that receives registration requests from Spark drivers running on Kubernetes.
* It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
*/
private[spark] class KubernetesShuffleBlockHandler (
transportConf: TransportConf,
kubernetesClientProvider: DriverPodKubernetesClientProvider)
kubernetesClient: KubernetesClient)
extends ExternalShuffleBlockHandler(transportConf, null) with Logging {

private val INIT_AND_STOP_LOCK = new Object
private val CONNECTED_APPS_LOCK = new Object
private val connectedApps = mutable.Set.empty[String]
private var shuffleWatch: Option[Watch] = None
private var kubernetesClient: Option[KubernetesClient] = None

def start(): Unit = INIT_AND_STOP_LOCK.synchronized {
val client = kubernetesClientProvider.get
shuffleWatch = startShuffleWatcher(client)
kubernetesClient = Some(client)
shuffleWatch = startShuffleWatcher()
}

override def close(): Unit = {
Expand All @@ -64,8 +62,7 @@ private[spark] class KubernetesShuffleBlockHandler (
INIT_AND_STOP_LOCK.synchronized {
shuffleWatch.foreach(IOUtils.closeQuietly)
shuffleWatch = None
kubernetesClient.foreach(IOUtils.closeQuietly)
kubernetesClient = None
IOUtils.closeQuietly(kubernetesClient)
}
}
}
Expand All @@ -90,9 +87,9 @@ private[spark] class KubernetesShuffleBlockHandler (
}
}

private def startShuffleWatcher(client: KubernetesClient): Option[Watch] = {
private def startShuffleWatcher(): Option[Watch] = {
try {
Some(client
Some(kubernetesClient
.pods()
.withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava)
.watch(new Watcher[Pod] {
Expand Down Expand Up @@ -137,42 +134,55 @@ private[spark] class KubernetesShuffleBlockHandler (
*/
private[spark] class KubernetesExternalShuffleService(
conf: SparkConf,
securityManager: SecurityManager,
kubernetesClientProvider: DriverPodKubernetesClientProvider)
securityManager: SecurityManager)
extends ExternalShuffleService(conf, securityManager) {

private var shuffleBlockHandlers: mutable.Buffer[KubernetesShuffleBlockHandler] = _
protected override def newShuffleBlockHandler(
tConf: TransportConf): ExternalShuffleBlockHandler = {
val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClientProvider)
newBlockHandler.start()

// TODO: figure out a better way of doing this.
// This is necessary because the constructor is not called
// when this class is initialized through ExternalShuffleService.
if (shuffleBlockHandlers == null) {
val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient(
conf.get(KUBERNETES_SHUFFLE_APISERVER_URI),
None,
APISERVER_AUTH_SHUFFLE_SERVICE_CONF_PREFIX,
conf,
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH))
.filter( _ => conf.get(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS)),
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))
.filter( _ => conf.get(KUBERNETES_SHUFFLE_USE_SERVICE_ACCOUNT_CREDENTIALS)))
val newBlockHandler = new KubernetesShuffleBlockHandler(tConf, kubernetesClient)
try {
newBlockHandler.start()
// TODO: figure out a better way of doing this.
// This is necessary because the constructor is not called
// when this class is initialized through ExternalShuffleService.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the ExternalShuffleService instantiate the KubernetesExternalShuffleService without calling its constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@foxish for this, I was also confused here. I don't see this being created reflectively in the surrounding code.

Copy link
Member

@foxish foxish Jun 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment isn't clear there. The issue I think, was that the ExternalShuffleService is extended by the KubernetesExternalShuffleService, but the ExternalShuffleService calls an overridden method within its own constructor:

  private val blockHandler = newShuffleBlockHandler(transportConf)

So, at that point, the constructor for KubernetesExternalShuffleService has not yet been called.

if (shuffleBlockHandlers == null) {
shuffleBlockHandlers = mutable.Buffer.empty[KubernetesShuffleBlockHandler]
}
shuffleBlockHandlers += newBlockHandler
newBlockHandler
} catch {
case e: Throwable =>
logError("Failed to create Kubernetes shuffle block handler.", e)
newBlockHandler.close()
throw e
}
shuffleBlockHandlers += newBlockHandler
newBlockHandler
}

override def stop(): Unit = {
try {
super.stop()
} finally {
shuffleBlockHandlers.foreach(_.close())
if (shuffleBlockHandlers != null) {
shuffleBlockHandlers.foreach(_.close())
}
}
}
}

private[spark] object KubernetesExternalShuffleService extends Logging {
def main(args: Array[String]): Unit = {
ExternalShuffleService.main(args,
(conf: SparkConf, sm: SecurityManager) => {
val kubernetesClientProvider = new DriverPodKubernetesClientProvider(conf)
new KubernetesExternalShuffleService(conf, sm, kubernetesClientProvider)
})
(conf: SparkConf, sm: SecurityManager) => new KubernetesExternalShuffleService(conf, sm))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.utils.HttpClientUtils
import okhttp3.Dispatcher

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.util.ThreadUtils

/**
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL
* options for different components.
*/
private[spark] object SparkKubernetesClientFactory {

def createKubernetesClient(
master: String,
namespace: Option[String],
kubernetesAuthConfPrefix: String,
sparkConf: SparkConf,
maybeServiceAccountToken: Option[File],
maybeServiceAccountCaCert: Option[File]): KubernetesClient = {
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX"
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX"
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf)
.map(new File(_))
.orElse(maybeServiceAccountToken)
val oauthTokenValue = sparkConf.getOption(oauthTokenConf)
OptionRequirements.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" +
s" value $oauthTokenConf.")

val caCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX")
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath))
val clientKeyFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX")
val clientCertFile = sparkConf
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX")
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val config = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withWebsocketPingInterval(0)
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8))
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.withOption(namespace) {
(ns, configBuilder) => configBuilder.withNamespace(ns)
}.build()
val baseHttpClient = HttpClientUtils.createHttpClient(config)
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder()
.dispatcher(dispatcher)
.build()
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config)
}

private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) {

def withOption[T]
(option: Option[T])
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = {
new OptionConfigurableConfigBuilder(option.map { opt =>
configurator(opt, configBuilder)
}.getOrElse(configBuilder))
}

def build(): Config = configBuilder.build()
}
}
Loading