Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a457bce
Download remotely-located resources on driver startup. Use init-conta…
mccheah Apr 25, 2017
7bcb093
FIx owner reference slightly
mccheah Apr 28, 2017
ed5cbf7
Clean up config
mccheah Apr 28, 2017
ec8c4af
Don't rely too heavily on conventions that can change
mccheah Apr 28, 2017
970fb5f
Fix flaky test
mccheah Apr 28, 2017
92cb069
Tidy up file resolver
mccheah Apr 28, 2017
0d2cb6f
Whitespace arrangement
mccheah Apr 28, 2017
9cab55d
Indentation change
mccheah Apr 28, 2017
2fe1921
Fix more indentation
mccheah Apr 28, 2017
5dddbd3
Consolidate init container component providers
mccheah Apr 28, 2017
7d145a2
Minor method signature and comment changes
mccheah Apr 28, 2017
8ca6c72
Rename class for consistency
mccheah May 1, 2017
a371b1d
Merge remote-tracking branch 'apache-spark-on-k8s/branch-2.1-kubernet…
mccheah May 11, 2017
157d8a4
Resolve conflicts
mccheah May 11, 2017
d4bf83d
Fix flaky test
mccheah May 11, 2017
98dcaa3
Add some tests and some refactoring.
mccheah May 13, 2017
4f73bb0
Make naming consistent for Staged -> Submitted
mccheah May 13, 2017
44ec870
Add unit test for the submission client.
mccheah May 16, 2017
a436e59
Refine expectations
mccheah May 16, 2017
ad738c6
Rename variables and fix typos
mccheah May 16, 2017
0b60e99
Merge remote-tracking branch 'apache-spark-on-k8s/branch-2.1-kubernet…
mccheah May 16, 2017
6134dc1
Address more comments. Remove redundant SingleKeyConfigMap.
mccheah May 16, 2017
58dfb43
Minor test adjustments.
mccheah May 16, 2017
f214df2
add another test
mccheah May 16, 2017
903e545
Merge remote-tracking branch 'apache-spark-on-k8s/branch-2.1-kubernet…
mccheah May 17, 2017
7c379b2
Fix conflicts.
mccheah May 17, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}

import org.apache.spark.deploy.kubernetes.constants._

private[spark] trait InitContainerResourceStagingServerSecretPlugin {

/**
* Configure the init-container to mount the secret files that allow it to retrieve dependencies
* from a resource staging server.
*/
def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder

/**
* Configure the pod to attach a Secret volume which hosts secret files allowing the
* init-container to retrieve dependencies from the resource staging server.
*/
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
}

private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
initContainerSecretName: String,
initContainerSecretMountPath: String)
extends InitContainerResourceStagingServerSecretPlugin {

override def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder = {
initContainer.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
}

override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
basePod.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, InitContainerUtil}

private[spark] trait SparkPodInitContainerBootstrap {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
* Note that this primarily assumes that the init-container's configuration is being provided
* by a ConfigMap that was installed by some other component; that is, the implementation
* here makes no assumptions about how the init-container is specifically configured. For
* example, this class is unaware if the init-container is fetching remote dependencies or if
* it is fetching dependencies from a resource staging server.
*/
def bootstrapInitContainerAndVolumes(
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
}

private[spark] class SparkPodInitContainerBootstrapImpl(
initContainerImage: String,
jarsDownloadPath: String,
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String,
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
extends SparkPodInitContainerBootstrap {

override def bootstrapInitContainerAndVolumes(
mainContainerName: String,
originalPodSpec: PodBuilder): PodBuilder = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withMountPath(jarsDownloadPath)
.build(),
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withMountPath(filesDownloadPath)
.build())

val initContainer = new ContainerBuilder()
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR)
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
}.getOrElse(initContainer).build()
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
originalPodSpec, resolvedInitContainer)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
.withNewConfigMap()
.withName(initContainerConfigMapName)
.addNewItem()
.withKey(initContainerConfigMapKey)
.withPath(INIT_CONTAINER_PROPERTIES_FILE_NAME)
.endItem()
.endConfigMap()
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.addNewVolume()
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
}.getOrElse(podWithBasicVolumes)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -349,42 +349,43 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_NAMESPACE = "kubernetes.resourceStagingServer"
private[spark] val RESOURCE_STAGING_SERVER_CERT_PEM =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.serverCertPem")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.serverCertPem")
.doc("Certificate PEM file to use when having the Kubernetes dependency server" +
" listen on TLS.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_PASSWORD_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyStorePasswordFile")
.doc("File containing the keystore password for the Kubernetes dependency server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_KEYSTORE_KEY_PASSWORD_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.keyPasswordFile")
.doc("File containing the key password for the Kubernetes dependency server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.enabled")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.enabled")
.doc("Whether or not to use SSL when communicating with the dependency server.")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStore")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStorePassword")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStorePassword")
.doc("Password for the trustStore for talking to the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
ConfigBuilder(s"spark.ssl.$RESOURCE_STAGING_SERVER_SSL_NAMESPACE.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server.")
.stringConf
.createOptional
Expand All @@ -397,64 +398,120 @@ package object config extends Logging {
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier")
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsResourceIdentifier")
.doc("Identifier for the jars tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation")
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download jars.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH)
.createWithDefault(s"$INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH/" +
s"$INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY")

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier")
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesResourceIdentifier")
.doc("Identifier for the files tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation")
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download files.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH)
.createWithDefault(
s"$INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH/$INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY")

private[spark] val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.initcontainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is" +
" calculated from spark.jars.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.initcontainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is" +
" calculated from spark.files.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image")
.doc("Image for the driver's init-container that downloads mounted dependencies.")
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
.doc("Image for the driver and executor's init-container that downloads dependencies.")
.stringConf
.createWithDefault(s"spark-driver-init:$sparkVersion")
.createWithDefault(s"spark-init:$sparkVersion")

private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir")
.doc("Location to download local jars to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
private[spark] val INIT_CONTAINER_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.jarsDownloadDir")
.doc("Location to download jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-jars")
.createWithDefault("/var/spark-data/spark-submitted-jars")

private[spark] val DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.filesDownloadDir")
.doc("Location to download local files to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
private[spark] val INIT_CONTAINER_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.filesDownloadDir")
.doc("Location to download files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-files")
.createWithDefault("/var/spark-data/spark-submitted-files")

private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT =
private[spark] val INIT_CONTAINER_MOUNT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
.doc("Timeout before aborting the attempt to download and unpack local dependencies from" +
" the dependency staging server when initializing the driver pod.")
" remote locations and the resource staging server when initializing the driver and" +
" executor pods.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(5)

private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP =
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapname")
.doc("Name of the config map to use in the init-container that retrieves submitted files" +
" for the executor.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY =
ConfigBuilder("spark.kubernetes.initcontainer.executor.configmapkey")
.doc("Key for the entry in the init container config map for submitted files that" +
" corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SECRET =
ConfigBuilder("spark.kubernetes.initcontainer.executor.stagingServerSecret.name")
.doc("Name of the secret to mount into the init-container that retrieves submitted files.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SECRET_MOUNT_DIR =
ConfigBuilder("spark.kubernetes.initcontainer.executor.stagingServerSecret.mountDir")
.doc("Directory to mount the resource staging server secrets into for the executor" +
" init-containers. This must be exactly the same as the directory that the submission" +
" client mounted the secret into because the config map's properties specify the" +
" secret location as to be the same between the driver init-container and the executor" +
" init-container. Thus the submission client will always set this and the driver will" +
" never rely on a constant or convention, in order to protect against cases where the" +
" submission client has a different version from the driver itself, and hence might" +
" have different constants loaded in constants.scala.")
.internal()
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Loading