diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala index 35246506ecb33..4a0e651c50405 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala @@ -16,8 +16,12 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder} +import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ /** @@ -47,7 +51,9 @@ private[spark] class SparkPodInitContainerBootstrapImpl( filesDownloadPath: String, downloadTimeoutMinutes: Long, initContainerConfigMapName: String, - initContainerConfigMapKey: String) + initContainerConfigMapKey: String, + sparkRole: String, + sparkConf: SparkConf) extends SparkPodInitContainerBootstrap { override def bootstrapInitContainerAndVolumes( @@ -62,10 +68,24 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .withMountPath(filesDownloadPath) .build()) + val initContainerCustomEnvVarKeyPrefix = sparkRole match { + case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY + case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv." + case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role") + } + val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(initContainerCustomEnvVarKeyPrefix) + .toSeq + .map(env => + new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) + val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer) .withName(s"spark-init") .withImage(initContainerImage) .withImagePullPolicy(dockerImagePullPolicy) + .addAllToEnv(initContainerCustomEnvVars.asJava) .addNewVolumeMount() .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR) @@ -73,6 +93,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .addToVolumeMounts(sharedVolumeMounts: _*) .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH) .build() + val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod) .editSpec() .addNewVolume() @@ -95,6 +116,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .endVolume() .endSpec() .build() + val mainContainerWithMountedFiles = new ContainerBuilder( podWithDetachedInitContainer.mainContainer) .addToVolumeMounts(sharedVolumeMounts: _*) @@ -103,6 +125,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .withValue(filesDownloadPath) .endEnv() .build() + PodWithDetachedInitContainer( podWithBasicVolumes, initContainer, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index 07a45dc1f1613..471285d087804 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -17,10 +17,10 @@ package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl} +import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl} import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl} import org.apache.spark.util.Utils @@ -43,7 +43,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( private val submittedResourcesSecretName = s"$kubernetesResourceNamePrefix-init-secret" private val resourceStagingServerUri = submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI) private val resourceStagingServerInternalUri = - submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI) + submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI) private val initContainerImage = submissionSparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) private val downloadTimeoutMinutes = submissionSparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) private val maybeResourceStagingServerInternalTrustStore = @@ -92,46 +92,62 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( def getAllConfigurationSteps(): Seq[InitContainerConfigurationStep] = { val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( - initContainerImage, - dockerImagePullPolicy, - jarsDownloadPath, - filesDownloadPath, - downloadTimeoutMinutes, - initContainerConfigMapName, - initContainerConfigMapKey) + initContainerImage, + dockerImagePullPolicy, + jarsDownloadPath, + filesDownloadPath, + downloadTimeoutMinutes, + initContainerConfigMapName, + initContainerConfigMapKey, + SPARK_POD_DRIVER_ROLE, + submissionSparkConf) val baseInitContainerStep = new BaseInitContainerConfigurationStep( - sparkJars, - sparkFiles, - jarsDownloadPath, - filesDownloadPath, - initContainerConfigMapName, - initContainerConfigMapKey, - initContainerBootstrap) - val submittedResourcesInitContainerStep = resourceStagingServerUri.map { - stagingServerUri => + sparkJars, + sparkFiles, + jarsDownloadPath, + filesDownloadPath, + initContainerConfigMapName, + initContainerConfigMapKey, + initContainerBootstrap) + + val submittedResourcesInitContainerStep = resourceStagingServerUri.map { stagingServerUri => val mountSecretPlugin = new InitContainerResourceStagingServerSecretPluginImpl( - submittedResourcesSecretName, - INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) + submittedResourcesSecretName, + INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) val submittedDependencyUploader = new SubmittedDependencyUploaderImpl( - driverLabels, - namespace, - stagingServerUri, - sparkJars, - sparkFiles, - new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions, - RetrofitClientFactoryImpl) + driverLabels, + namespace, + stagingServerUri, + sparkJars, + sparkFiles, + new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions, + RetrofitClientFactoryImpl) new SubmittedResourcesInitContainerConfigurationStep( - submittedResourcesSecretName, - resourceStagingServerInternalUri.getOrElse(stagingServerUri), - INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH, - resourceStagingServerInternalSslEnabled, - maybeResourceStagingServerInternalTrustStore, - maybeResourceStagingServerInternalClientCert, - maybeResourceStagingServerInternalTrustStorePassword, - maybeResourceStagingServerInternalTrustStoreType, - submittedDependencyUploader, - mountSecretPlugin) + submittedResourcesSecretName, + resourceStagingServerInternalUri.getOrElse(stagingServerUri), + INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH, + resourceStagingServerInternalSslEnabled, + maybeResourceStagingServerInternalTrustStore, + maybeResourceStagingServerInternalClientCert, + maybeResourceStagingServerInternalTrustStorePassword, + maybeResourceStagingServerInternalTrustStoreType, + submittedDependencyUploader, + mountSecretPlugin) + } + + val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs( + submissionSparkConf, + KUBERNETES_DRIVER_SECRETS_PREFIX, + "driver secrets") + val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) { + val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths) + Some(new InitContainerMountSecretsStep(mountSecretsBootstrap)) + } else { + None } - Seq(baseInitContainerStep) ++ submittedResourcesInitContainerStep.toSeq + + Seq(baseInitContainerStep) ++ + submittedResourcesInitContainerStep.toSeq ++ + mountSecretsStep.toSeq } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala new file mode 100644 index 0000000000000..a2df3d35b8c84 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer + +import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap + +/** + * An init-container configuration step for mounting user-specified secrets onto user-specified + * paths. + * + * @param mountSecretsBootstrap a utility actually handling mounting of the secrets. + */ +private[spark] class InitContainerMountSecretsStep( + mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep { + + override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = { + val (podWithSecretsMounted, initContainerWithSecretsMounted) = + mountSecretsBootstrap.mountSecrets( + initContainerSpec.podToInitialize, + initContainerSpec.initContainer) + initContainerSpec.copy( + podToInitialize = podWithSecretsMounted, + initContainer = initContainerWithSecretsMounted + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 98a0d879b6a58..73e08b170a4a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import scala.collection.JavaConverters._ -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ @@ -44,6 +44,7 @@ private[spark] class ExecutorPodFactoryImpl( mountSecretsBootstrap: Option[MountSecretsBootstrap], mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], + executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider) extends ExecutorPodFactory { @@ -82,9 +83,6 @@ private[spark] class ExecutorPodFactoryImpl( private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) private val blockmanagerPort = sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val kubernetesDriverPodName = sparkConf - .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(throw new SparkException("Must specify the driver pod name")) private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) @@ -234,6 +232,7 @@ private[spark] class ExecutorPodFactoryImpl( bootstrap.mountSmallFilesSecret( withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) }.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)) + val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = executorInitContainerBootstrap.map { bootstrap => val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( @@ -247,8 +246,13 @@ private[spark] class ExecutorPodFactoryImpl( podWithDetachedInitContainer.initContainer) }.getOrElse(podWithDetachedInitContainer.initContainer) + val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) = + executorInitContainerMountSecretsBootstrap.map { bootstrap => + bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer) + }.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer) + val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( - podWithDetachedInitContainer.pod, resolvedInitContainer) + mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 60260e2931c29..709f3b72b92f5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.ThreadUtils private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -78,7 +78,9 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION), sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT), configMap, - configMapKey) + configMapKey, + SPARK_POD_EXECUTOR_ROLE, + sparkConf) } val mountSmallFilesBootstrap = for { @@ -95,6 +97,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit } else { None } + val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) { + Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths)) + } else { + None + } if (maybeInitContainerConfigMap.isEmpty) { logWarning("The executor's init-container config map was not specified. Executors will" + @@ -133,6 +140,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit mountSecretBootstrap, mountSmallFilesBootstrap, executorInitContainerBootstrap, + executorInitContainerMountSecretsBootstrap, executorInitContainerSecretVolumePlugin, executorLocalDirVolumeProvider) val allocatorExecutor = ThreadUtils diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala index 946fde95ca80b..d850b2a694f20 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala @@ -16,14 +16,17 @@ */ package org.apache.spark.deploy.k8s +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model._ import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.SparkFunSuite class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAfter { + private val INIT_CONTAINER_IMAGE = "spark-init:latest" private val DOCKER_IMAGE_PULL_POLICY = "IfNotPresent" private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" @@ -40,7 +43,9 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf FILES_DOWNLOAD_PATH, DOWNLOAD_TIMEOUT_MINUTES, INIT_CONTAINER_CONFIG_MAP_NAME, - INIT_CONTAINER_CONFIG_MAP_KEY) + INIT_CONTAINER_CONFIG_MAP_KEY, + SPARK_POD_DRIVER_ROLE, + new SparkConf()) private val expectedSharedVolumeMap = Map( JARS_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME, FILES_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME) @@ -60,6 +65,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(initContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY) assert(initContainer.getArgs.asScala.head === INIT_CONTAINER_PROPERTIES_FILE_PATH) } + test("Main: Volume mounts and env") { val returnedPodWithCont = sparkPodInit.bootstrapInitContainerAndVolumes( PodWithDetachedInitContainer( @@ -73,6 +79,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(mainContainer.getEnv.asScala.map(e => (e.getName, e.getValue)).toMap === Map(ENV_MOUNTED_FILES_DIR -> FILES_DOWNLOAD_PATH)) } + test("Pod: Volume Mounts") { val returnedPodWithCont = sparkPodInit.bootstrapInitContainerAndVolumes( PodWithDetachedInitContainer( @@ -93,6 +100,70 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(volumes(2).getEmptyDir === new EmptyDirVolumeSource()) } + test("InitContainer: driver custom environment variables") { + val sparkConf = new SparkConf() + .set(s"${KUBERNETES_DRIVER_ENV_KEY}env1", "val1") + .set(s"${KUBERNETES_DRIVER_ENV_KEY}env2", "val2") + val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_IMAGE, + DOCKER_IMAGE_PULL_POLICY, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOWNLOAD_TIMEOUT_MINUTES, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + SPARK_POD_DRIVER_ROLE, + sparkConf) + + val returnedPod = initContainerBootstrap.bootstrapInitContainerAndVolumes( + PodWithDetachedInitContainer( + pod = basePod().build(), + initContainer = new Container(), + mainContainer = new ContainerBuilder().withName(MAIN_CONTAINER_NAME).build())) + val initContainer: Container = returnedPod.initContainer + + assert(initContainer.getEnv.size() == 2) + val envVars = initContainer + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envVars("env1") == "val1") + assert(envVars("env2") == "val2") + } + + test("InitContainer: executor custom environment variables") { + val sparkConf = new SparkConf() + .set(s"spark.executorEnv.env1", "val1") + .set(s"spark.executorEnv.env2", "val2") + val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_IMAGE, + DOCKER_IMAGE_PULL_POLICY, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOWNLOAD_TIMEOUT_MINUTES, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + SPARK_POD_EXECUTOR_ROLE, + sparkConf) + + val returnedPod = initContainerBootstrap.bootstrapInitContainerAndVolumes( + PodWithDetachedInitContainer( + pod = basePod().build(), + initContainer = new Container(), + mainContainer = new ContainerBuilder().withName(MAIN_CONTAINER_NAME).build())) + val initContainer: Container = returnedPod.initContainer + + assert(initContainer.getEnv.size() == 2) + val envVars = initContainer + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envVars("env1") == "val1") + assert(envVars("env2") == "val2") + } + private def basePod(): PodBuilder = { new PodBuilder() .withNewMetadata() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala index 3810a324f99b5..fad178ab8bfa7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala @@ -43,6 +43,9 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map" private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key" private val STAGING_SERVER_URI = "http://localhost:8000" + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/init-container" test ("error thrown if local jars provided without resource staging server") { val sparkConf = new SparkConf(true) @@ -160,4 +163,27 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { assert(initSteps.length === 1) assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) } + + test("including step to mount user-specified secrets") { + val sparkConf = new SparkConf(false) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH) + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps : Seq[InitContainerConfigurationStep] = + orchestrator.getAllConfigurationSteps() + assert(initSteps.length === 2) + assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) + assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep]) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala new file mode 100644 index 0000000000000..9ac48126ca381 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils} + +class InitContainerMountSecretsStepSuite extends SparkFunSuite { + + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/init-container" + + test("Mounts all given secrets") { + val baseInitContainerSpec = InitContainerSpec( + Map.empty, + Map.empty, + new ContainerBuilder().build(), + new ContainerBuilder().build(), + new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(), + Seq.empty) + val secretNamesToMountPaths = Map( + SECRET_FOO -> SECRET_MOUNT_PATH, + SECRET_BAR -> SECRET_MOUNT_PATH) + + val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths) + val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap) + val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer( + baseInitContainerSpec) + + val podWithSecretsMounted = configuredInitContainerSpec.podToInitialize + val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer + + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName => + assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName))) + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName => + assert(SecretVolumeUtils.containerHasVolume( + initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 71204a5aa1deb..14cadb4acaa0f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -77,6 +77,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -111,13 +112,14 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") val factory = new ExecutorPodFactoryImpl( - conf, - nodeAffinityExecutorPodModifier, - None, - None, - None, - None, - executorLocalDirVolumeProvider) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -138,6 +140,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -172,6 +175,34 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, Some(initContainerBootstrap), None, + None, + executorLocalDirVolumeProvider) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + + assert(executor.getSpec.getInitContainers.size() === 1) + + checkOwnerReferences(executor, driverPodUid) + } + + test("init-container with secrets mount bootstrap") { + val conf = baseConf.clone() + val initContainerBootstrap = mock(classOf[SparkPodInitContainerBootstrap]) + when(initContainerBootstrap.bootstrapInitContainerAndVolumes( + any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1")) + + val factory = new ExecutorPodFactoryImpl( + conf, + nodeAffinityExecutorPodModifier, + None, + None, + Some(initContainerBootstrap), + Some(secretsBootstrap), + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -180,6 +211,11 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) assert(executor.getSpec.getInitContainers.size() === 1) + assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName + === "secret1-volume") + assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/var/secret1") + checkOwnerReferences(executor, driverPodUid) } @@ -202,6 +238,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -223,6 +260,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(smallFiles), None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -250,13 +288,14 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") val factory = new ExecutorPodFactoryImpl( - conf, - nodeAffinityExecutorPodModifier, - None, - None, - None, - None, - executorLocalDirVolumeProvider) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]())