Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,96 +365,155 @@ package object config extends Logging {
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier")
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsResourceIdentifier")
.doc("Identifier for the jars tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation")
ConfigBuilder("spark.kubernetes.initcontainer.downloadJarsSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download jars.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_PATH)

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier")
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesResourceIdentifier")
.doc("Identifier for the files tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation")
ConfigBuilder("spark.kubernetes.initcontainer.downloadFilesSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download files.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_SECRET_PATH)

private[spark] val INIT_CONTAINER_REMOTE_JARS =
ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteJars")
ConfigBuilder("spark.kubernetes.initcontainer.remoteJars")
.doc("Comma-separated list of jar URIs to download in the init-container. This is inferred" +
" from spark.jars.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_REMOTE_FILES =
ConfigBuilder("spark.kubernetes.driver.initcontainer.remoteFiles")
ConfigBuilder("spark.kubernetes.initcontainer.remoteFiles")
.doc("Comma-separated list of file URIs to download in the init-container. This is inferred" +
" from spark.files.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image")
.doc("Image for the driver's init-container that downloads mounted dependencies.")
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
.doc("Image for the driver and executor's init-container that downloads dependencies.")
.stringConf
.createWithDefault(s"spark-driver-init:$sparkVersion")
.createWithDefault(s"spark-init:$sparkVersion")

private[spark] val DRIVER_SUBMITTED_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedJars.downloadDir")
.doc("Location to download local jars to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
private[spark] val SUBMITTED_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.submittedJars.downloadDir")
.doc("Location to download local jars to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-jars")
.createWithDefault("/var/spark-data/spark-submitted-jars")

private[spark] val DRIVER_SUBMITTED_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.submittedFiles.downloadDir")
.doc("Location to download local files to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
private[spark] val SUBMITTED_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.submittedFiles.downloadDir")
.doc("Location to download submitted files to in the driver and executors. When using" +
" spark-submit, this directory must be empty and will be mounted as an empty directory" +
" volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-files")
.createWithDefault("/var/spark-data/spark-submitted-files")

private[spark] val DRIVER_REMOTE_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.remoteJars.downloadDir")
.doc("Location to download remotely-located (e.g. HDFS) jars to in the driver. When" +
" using spark-submit, this directory must be empty and will be mounted as an empty" +
" directory volume on the driver pod.")
private[spark] val REMOTE_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.remoteJars.downloadDir")
.doc("Location to download remotely-located (e.g. HDFS) jars to in the driver and" +
" executors. When using spark-submit, this directory must be empty and will be" +
" mounted as an empty directory volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-remote-jars")

private[spark] val DRIVER_REMOTE_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.remoteFiles.downloadDir")
.doc("Location to download remotely-located (e.g. HDFS) files to in the driver. When" +
" using spark-submit, this directory must be empty and will be mounted as an empty" +
" directory volume on the driver pod.")
private[spark] val REMOTE_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.mountdependencies.remoteFiles.downloadDir")
.doc("Location to download remotely-located (e.g. HDFS) files to in the driver and" +
" executors. When using spark-submit, this directory must be empty and will be mounted" +
" as an empty directory volume on the driver and executor pods.")
.stringConf
.createWithDefault("/var/spark-data/spark-remote-files")

private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT =
private[spark] val MOUNT_DEPENDENCIES_INIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
.doc("Timeout before aborting the attempt to download and unpack local dependencies from" +
" the dependency staging server when initializing the driver pod.")
" remote locations and the resource etaging server when initializing the driver and" +
" executor pods.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(5)

private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP =
ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.configmapname")
.doc("Name of the config map to use in the init-container that retrieves submitted files" +
" for the executor.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY =
ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.configmapkey")
.doc("Key for the entry in the init container config map for submitted files that" +
" corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET =
ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.stagingServerSecret")
.doc("Name of the secret to mount into the init-container that retrieves submitted files.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_SUBMITTED_FILES_RESOURCE_STAGING_SERVER_SECRET_DIR =
ConfigBuilder("spark.kubernetes.initcontainer.executor.submittedfiles.stagingServerSecretDir")
.doc("Directory to mount the executor's init container secret for retrieving submitted" +
" files.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP =
ConfigBuilder("spark.kubernetes.initcontainer.executor.remoteFiles.configmapname")
.doc("Name of the config map to use in the init-container that retrieves remote files" +
" for the executor.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY =
ConfigBuilder("spark.kubernetes.initcontainer.executor.remoteFiles.configmapkey")
.doc("Key for the entry in the init container config map for remote files that" +
" corresponds to the properties for this init-container.")
.internal()
.stringConf
.createOptional

private[spark] val EXECUTOR_RESOLVED_MOUNTED_CLASSPATH =
ConfigBuilder("spark.kubernetes.executor.resolvedMountedClasspath")
.doc("Expected resolved classpath after the executor's init-containers download" +
" dependencies from the resource staging server and from remote locations, if" +
" applicable. The submission client determines this assuming that the executors will" +
" download the dependencies in the same way that the driver does.")
.internal()
.stringConf
.toSequence
.createWithDefault(Seq.empty[String])

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ package object constants {
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
private[spark] val ENV_EXECUTOR_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
Expand All @@ -107,12 +108,6 @@ package object constants {

// V2 submission init container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"

// Init container for downloading submitted files from the staging server.
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_CONTAINER_NAME =
"spark-driver-download-submitted-files"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_NAME =
"resource-staging-server-secret"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SECRETS_VOLUME_MOUNT_PATH =
"/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_SECRET_KEY =
Expand All @@ -131,35 +126,13 @@ package object constants {
s"$INIT_CONTAINER_SUBMITTED_FILES_TRUSTSTORE_SECRET_KEY"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_CONFIG_MAP_KEY =
"download-submitted-files"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_VOLUME =
"download-submitted-files-properties"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH =
"/etc/spark-init/download-submitted-files"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME =
"init-driver-download-submitted-files.properties"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_MOUNT_PATH/" +
s"$INIT_CONTAINER_SUBMITTED_FILES_PROPERTIES_FILE_NAME"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_JARS_VOLUME_NAME =
"download-submitted-jars"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_DOWNLOAD_FILES_VOLUME_NAME =
"download-submitted-files"

// Init container for fetching remote dependencies.
private[spark] val INIT_CONTAINER_REMOTE_FILES_CONTAINER_NAME =
"spark-driver-download-remote-files"
private[spark] val INIT_CONTAINER_REMOTE_FILES_CONFIG_MAP_KEY =
"download-remote-files"
private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_VOLUME =
"download-remote-files-properties"
private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH =
"/etc/spark-init/download-remote-files"
private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME =
"init-driver-download-remote-files.properties"
private[spark] val INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_MOUNT_PATH/" +
s"$INIT_CONTAINER_REMOTE_FILES_PROPERTIES_FILE_NAME"
private[spark] val INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_JARS_VOLUME_NAME = "download-remote-jars"
private[spark] val INIT_CONTAINER_REMOTE_FILES_DOWNLOAD_FILES_VOLUME_NAME =
"download-remote-files"
private[spark] val INIT_CONTAINER_SUBMITTED_FILES_SUFFIX = "staged-files"
private[spark] val INIT_CONTAINER_REMOTE_FILES_SUFFIX = "remote-files"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME = "download-jars-volume"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME = "download-files"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "spark-init-properties"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "spark-init.properties"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ private[spark] class Client(
// If the resource staging server isn't being used, then resolvedJars = spark.jars.
val resolvedJars = mutable.Buffer[String]()
val resolvedFiles = mutable.Buffer[String]()
val driverPodWithMountedDeps = maybeStagingServerUri.map { stagingServerUri =>
val (resolvedSparkConf, driverPodWithMountedDeps) = maybeStagingServerUri.map { stagingUri =>
val submittedDependencyManager = submittedDependencyManagerProvider
.getSubmittedDependencyManager(
kubernetesAppId,
stagingServerUri,
stagingUri,
allLabels,
namespace,
sparkJars,
Expand All @@ -175,11 +175,16 @@ private[spark] class Client(
resolvedFiles ++= submittedDependencyManager.resolveSparkFiles()
nonDriverPodKubernetesResources += initContainerKubernetesSecret
nonDriverPodKubernetesResources += initContainerConfigMap
submittedDependencyManager.configurePodToMountLocalDependencies(
driverContainer.getName,
initContainerKubernetesSecret,
initContainerConfigMap,
driverPodWithMountedDriverKubernetesCredentials)
val bootstrappedPod = submittedDependencyManager.getInitContainerBootstrap(
initContainerKubernetesSecret, initContainerConfigMap)
.bootstrapInitContainerAndVolumes(
driverContainer.getName, driverPodWithMountedDriverKubernetesCredentials)
val sparkConfWithExecutorInitContainer = submittedDependencyManager
.configureExecutorsToFetchSubmittedDependencies(
sparkConfWithDriverPodKubernetesCredentialLocations,
initContainerConfigMap,
initContainerKubernetesSecret)
(sparkConfWithExecutorInitContainer, bootstrappedPod)
}.getOrElse {
sparkJars.map(Utils.resolveURI).foreach { jar =>
require(Option.apply(jar.getScheme).getOrElse("file") != "file",
Expand All @@ -193,9 +198,9 @@ private[spark] class Client(
}
resolvedJars ++= sparkJars
resolvedFiles ++= sparkFiles
driverPodWithMountedDriverKubernetesCredentials
(sparkConfWithDriverPodKubernetesCredentialLocations.clone(),
driverPodWithMountedDriverKubernetesCredentials)
}
val resolvedSparkConf = sparkConfWithDriverPodKubernetesCredentialLocations.clone()
if (resolvedJars.nonEmpty) {
resolvedSparkConf.set("spark.jars", resolvedJars.mkString(","))
}
Expand All @@ -215,8 +220,11 @@ private[spark] class Client(
.buildInitContainerConfigMap()
nonDriverPodKubernetesResources += downloadRemoteDependenciesConfigMap
val driverPodWithMountedAndDownloadedDeps = remoteDependencyManager
.configurePodToDownloadRemoteDependencies(
downloadRemoteDependenciesConfigMap, driverContainer.getName, driverPodWithMountedDeps)
.getInitContainerBootstrap(downloadRemoteDependenciesConfigMap)
.bootstrapInitContainerAndVolumes(driverContainer.getName, driverPodWithMountedDeps)
val sparkConfExecutorsFetchRemoteDeps = remoteDependencyManager
.configureExecutorsToFetchRemoteDependencies(
resolvedSparkConf, downloadRemoteDependenciesConfigMap)

// The resolved local classpath should *only* contain local file URIs. It consists of the
// driver's classpath (minus spark.driver.extraClassPath which was handled above) with the
Expand All @@ -227,8 +235,10 @@ private[spark] class Client(
resolvedLocalClassPath.foreach { classPathEntry =>
require(Option(URI.create(classPathEntry).getScheme).isEmpty)
}
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) =>
s"-D$confKey=$confValue"
sparkConfExecutorsFetchRemoteDeps.set(
EXECUTOR_RESOLVED_MOUNTED_CLASSPATH, resolvedLocalClassPath)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure if this is the best way to pass this information along. The submission client here has already resolved what the local classpath should be and assumes that the same localized classpath would be valid on the executors. This is similar to what the YARN submission client does, but it would be nicer to have the driver itself be responsible for determining the classpath of its executors. However, perhaps having such logic on the driver is repetitive if the submission client had already determined what the resolved classpath would be.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we can just add jarsDownloadPath/* to the classpath. Everywhere in Spark the exact jar files are enumerated, but since we're guaranteeing that the download path is an empty directory volume mount and is strictly being used for jars, it might make sense to just reference the whole directory in this context.

val resolvedDriverJavaOpts = sparkConfExecutorsFetchRemoteDeps.getAll.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
val resolvedDriverPod = driverPodWithMountedAndDownloadedDeps.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
Expand Down
Loading