Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.ConfigurationUtils
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep}
import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.HadoopStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator
import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{SystemClock, Utils}

Expand Down Expand Up @@ -117,6 +116,13 @@ private[spark] class DriverConfigurationStepsOrchestrator(
val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep(
submissionSparkConf)

val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrap(driverSecretNamesToMountPaths)
Some(new MountSecretsStep(mountSecretsBootstrap))
} else {
None
}

val hadoopConfigSteps =
hadoopConfDir.map { conf =>
val hadoopStepsOrchestrator =
Expand Down Expand Up @@ -204,23 +210,16 @@ private[spark] class DriverConfigurationStepsOrchestrator(
jarsDownloadPath,
localFilesDownloadPath)

val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
Some(new MountSecretsStep(mountSecretsBootstrap))
} else {
None
}

Seq(
initialSubmissionStep,
driverAddressStep,
kubernetesCredentialsStep,
dependencyResolutionStep,
localDirectoryMountConfigurationStep) ++
mountSecretsStep.toSeq ++
submittedDependenciesBootstrapSteps ++
hadoopConfigSteps.toSeq ++
resourceStep.toSeq ++
mountSecretsStep.toSeq
resourceStep.toSeq
}

private def areAnyFilesNonContainerLocal(files: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,17 @@ package org.apache.spark.deploy.k8s.submit

import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder}

/**
* Bootstraps a driver or executor pod with needed secrets mounted.
*/
private[spark] trait MountSecretsBootstrap {
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {

/**
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
* Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
*
* @param pod the pod into which the secret volumes are being added.
* @param container the container into which the secret volumes are being mounted.
* @return the updated pod and container with the secrets mounted.
* @return the updated pod with the secret volumes added.
*/
def mountSecrets(pod: Pod, container: Container): (Pod, Container)
}

private[spark] class MountSecretsBootstrapImpl(
secretNamesToMountPaths: Map[String, String]) extends MountSecretsBootstrap {

override def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
def addSecretVolumes(pod: Pod): Pod = {
var podBuilder = new PodBuilder(pod)
secretNamesToMountPaths.keys.foreach(name =>
secretNamesToMountPaths.keys.foreach { name =>
podBuilder = podBuilder
.editOrNewSpec()
.addNewVolume()
Expand All @@ -47,18 +37,30 @@ private[spark] class MountSecretsBootstrapImpl(
.withSecretName(name)
.endSecret()
.endVolume()
.endSpec())
.endSpec()
}

podBuilder.build()
}

/**
* Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
* given container.
*
* @param container the container into which the secret volumes are being mounted.
* @return the updated container with the secrets mounted.
*/
def mountSecrets(container: Container): Container = {
var containerBuilder = new ContainerBuilder(container)
secretNamesToMountPaths.foreach(namePath =>
secretNamesToMountPaths.foreach { case (name, path) =>
containerBuilder = containerBuilder
.addNewVolumeMount()
.withName(secretVolumeName(namePath._1))
.withMountPath(namePath._2)
.withName(secretVolumeName(name))
.withMountPath(path)
.endVolumeMount()
)
}

(podBuilder.build(), containerBuilder.build())
containerBuilder.build()
}

private def secretVolumeName(secretName: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
/**
* A driver configuration step for mounting user-specified secrets onto user-specified paths.
*
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
* @param bootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class MountSecretsStep(
mountSecretsBootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(driverSpec.driverPod, driverSpec.driverContainer)
val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
val container = bootstrap.mountSecrets(driverSpec.driverContainer)
driverSpec.copy(
driverPod = driverPodWithSecretsMounted,
driverContainer = driverContainerWithSecretsMounted
driverPod = pod,
driverContainer = container
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrap, SubmittedDependencyUploaderImpl}
import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -140,7 +140,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
KUBERNETES_DRIVER_SECRETS_PREFIX,
"driver secrets")
val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) {
val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(driverSecretNamesToMountPaths)
Some(new InitContainerMountSecretsStep(mountSecretsBootstrap))
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@ import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap
* An init-container configuration step for mounting user-specified secrets onto user-specified
* paths.
*
* @param mountSecretsBootstrap a utility actually handling mounting of the secrets.
* @param bootstrap a utility actually handling mounting of the secrets.
*/
private[spark] class InitContainerMountSecretsStep(
mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {

override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = {
val (podWithSecretsMounted, initContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(
initContainerSpec.podToInitialize,
initContainerSpec.initContainer)
initContainerSpec.copy(
podToInitialize = podWithSecretsMounted,
initContainer = initContainerWithSecretsMounted
)
override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
// Mount the secret volumes given that the volumes have already been added to the driver pod
// when mounting the secrets into the main driver container.
val initContainer = bootstrap.mountSecrets(spec.initContainer)
spec.copy(initContainer = initContainer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package org.apache.spark.scheduler.cluster.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder}
import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, HadoopConfSparkUserBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
Expand Down Expand Up @@ -235,7 +235,8 @@ private[spark] class ExecutorPodFactoryImpl(

val (withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) =
mountSecretsBootstrap.map {bootstrap =>
bootstrap.mountSecrets(executorPod, containerWithExecutorLimitCores)
(bootstrap.addSecretVolumes(executorPod),
bootstrap.mountSecrets(containerWithExecutorLimitCores))
}.getOrElse((executorPod, containerWithExecutorLimitCores))
val (withMaybeSmallFilesMountedPod, withMaybeSmallFilesMountedContainer) =
mountSmallFilesBootstrap.map { bootstrap =>
Expand All @@ -258,7 +259,7 @@ private[spark] class ExecutorPodFactoryImpl(

val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) =
executorInitContainerMountSecretsBootstrap.map { bootstrap =>
bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer)
(podWithDetachedInitContainer.pod, bootstrap.mountSecrets(resolvedInitContainer))
}.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer)

val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopConfUtils, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.k8s.config._
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrap, MountSmallFilesBootstrapImpl}
import org.apache.spark.internal.Logging
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
Expand Down Expand Up @@ -125,12 +125,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
val executorSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs(sparkConf,
KUBERNETES_EXECUTOR_SECRETS_PREFIX, "executor secrets")
val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) {
Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths))
Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit
package org.apache.spark.deploy.k8s

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] object SecretVolumeUtils {

def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
def podHasVolume(pod: Pod, volumeName: String): Boolean = {
pod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
}

def containerHasVolume(
driverContainer: Container,
container: Container,
volumeName: String,
mountPath: String): Boolean = {
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
container.getVolumeMounts.asScala.exists(volumeMount =>
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.submit
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.SecretVolumeUtils

private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {

Expand All @@ -34,9 +35,9 @@ private[spark] class MountSecretsBootstrapSuite extends SparkFunSuite {
val driverContainer = new ContainerBuilder().build()
val driverPod = new PodBuilder().build()

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val bootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val (driverPodWithSecretsMounted, driverContainerWithSecretsMounted) =
mountSecretsBootstrap.mountSecrets(driverPod, driverContainer)
(bootstrap.addSecretVolumes(driverPod), bootstrap.mountSecrets(driverContainer))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.submitsteps
package org.apache.spark.deploy.k8s.submit.submitsteps

import java.nio.file.Paths

import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.deploy.k8s.submit.submitsteps.{KubernetesDriverSpec, LocalDirectoryMountConfigurationStep}

private[spark] class LocalDirectoryMountConfigurationStepSuite extends SparkFunSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package org.apache.spark.deploy.k8s.submit.submitsteps

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.SecretVolumeUtils
import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap

private[spark] class MountSecretsStepSuite extends SparkFunSuite {

Expand All @@ -31,7 +32,7 @@ private[spark] class MountSecretsStepSuite extends SparkFunSuite {
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val mountSecretsStep = new MountSecretsStep(mountSecretsBootstrap)
val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec)
val driverPodWithSecretsMounted = configuredDriverSpec.driverPod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils}
import org.apache.spark.deploy.k8s.SecretVolumeUtils
import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap

class InitContainerMountSecretsStepSuite extends SparkFunSuite {

Expand All @@ -39,16 +40,12 @@ class InitContainerMountSecretsStepSuite extends SparkFunSuite {
SECRET_FOO -> SECRET_MOUNT_PATH,
SECRET_BAR -> SECRET_MOUNT_PATH)

val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths)
val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths)
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
baseInitContainerSpec)

val podWithSecretsMounted = configuredInitContainerSpec.podToInitialize
val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer

Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName)))
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
assert(SecretVolumeUtils.containerHasVolume(
initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))
Expand Down
Loading