From ae60cdcc2322ea97089298d21462ed8bc1e54c3b Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Tue, 28 Nov 2017 15:14:57 -0800 Subject: [PATCH 1/4] Allow setting user-specified environments in the init-container --- .../k8s/SparkPodInitContainerBootstrap.scala | 20 ++++- .../org/apache/spark/deploy/k8s/config.scala | 2 + ...tainerConfigurationStepsOrchestrator.scala | 74 ++++++++++--------- .../k8s/KubernetesClusterManager.scala | 5 +- .../SparkPodInitContainerBootstrapSuite.scala | 43 ++++++++++- 5 files changed, 101 insertions(+), 43 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala index 35246506ecb33..51e11c2ee49d9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala @@ -16,8 +16,12 @@ */ package org.apache.spark.deploy.k8s -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder} +import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.config.KUBERNETES_INITCONTAINER_ENV_KEY import org.apache.spark.deploy.k8s.constants._ /** @@ -47,7 +51,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl( filesDownloadPath: String, downloadTimeoutMinutes: Long, initContainerConfigMapName: String, - initContainerConfigMapKey: String) + initContainerConfigMapKey: String, + sparkConf: SparkConf) extends SparkPodInitContainerBootstrap { override def bootstrapInitContainerAndVolumes( @@ -62,10 +67,18 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .withMountPath(filesDownloadPath) .build()) + val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(KUBERNETES_INITCONTAINER_ENV_KEY) + .toSeq + .map(env => + new EnvVarBuilder() + .withName(env._1) + .withValue(env._2) + .build()) val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer) .withName(s"spark-init") .withImage(initContainerImage) .withImagePullPolicy(dockerImagePullPolicy) + .addAllToEnv(initContainerCustomEnvVars.asJava) .addNewVolumeMount() .withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME) .withMountPath(INIT_CONTAINER_PROPERTIES_FILE_DIR) @@ -73,6 +86,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .addToVolumeMounts(sharedVolumeMounts: _*) .addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH) .build() + val podWithBasicVolumes = new PodBuilder(podWithDetachedInitContainer.pod) .editSpec() .addNewVolume() @@ -95,6 +109,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .endVolume() .endSpec() .build() + val mainContainerWithMountedFiles = new ContainerBuilder( podWithDetachedInitContainer.mainContainer) .addToVolumeMounts(sharedVolumeMounts: _*) @@ -103,6 +118,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .withValue(filesDownloadPath) .endEnv() .build() + PodWithDetachedInitContainer( podWithBasicVolumes, initContainer, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index 0e35e04ff5803..8636f5825ce26 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -121,6 +121,8 @@ package object config extends Logging { private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." + private[spark] val KUBERNETES_INITCONTAINER_ENV_KEY = "spark.kubernetes.initContainerEnv." + private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index 07a45dc1f1613..083201e858c7c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -43,7 +43,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( private val submittedResourcesSecretName = s"$kubernetesResourceNamePrefix-init-secret" private val resourceStagingServerUri = submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI) private val resourceStagingServerInternalUri = - submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI) + submissionSparkConf.get(RESOURCE_STAGING_SERVER_INTERNAL_URI) private val initContainerImage = submissionSparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) private val downloadTimeoutMinutes = submissionSparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) private val maybeResourceStagingServerInternalTrustStore = @@ -92,46 +92,48 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( def getAllConfigurationSteps(): Seq[InitContainerConfigurationStep] = { val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( - initContainerImage, - dockerImagePullPolicy, - jarsDownloadPath, - filesDownloadPath, - downloadTimeoutMinutes, - initContainerConfigMapName, - initContainerConfigMapKey) + initContainerImage, + dockerImagePullPolicy, + jarsDownloadPath, + filesDownloadPath, + downloadTimeoutMinutes, + initContainerConfigMapName, + initContainerConfigMapKey, + submissionSparkConf) val baseInitContainerStep = new BaseInitContainerConfigurationStep( - sparkJars, - sparkFiles, - jarsDownloadPath, - filesDownloadPath, - initContainerConfigMapName, - initContainerConfigMapKey, - initContainerBootstrap) - val submittedResourcesInitContainerStep = resourceStagingServerUri.map { - stagingServerUri => + sparkJars, + sparkFiles, + jarsDownloadPath, + filesDownloadPath, + initContainerConfigMapName, + initContainerConfigMapKey, + initContainerBootstrap) + + val submittedResourcesInitContainerStep = resourceStagingServerUri.map { stagingServerUri => val mountSecretPlugin = new InitContainerResourceStagingServerSecretPluginImpl( - submittedResourcesSecretName, - INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) + submittedResourcesSecretName, + INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) val submittedDependencyUploader = new SubmittedDependencyUploaderImpl( - driverLabels, - namespace, - stagingServerUri, - sparkJars, - sparkFiles, - new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions, - RetrofitClientFactoryImpl) + driverLabels, + namespace, + stagingServerUri, + sparkJars, + sparkFiles, + new ResourceStagingServerSslOptionsProviderImpl(submissionSparkConf).getSslOptions, + RetrofitClientFactoryImpl) new SubmittedResourcesInitContainerConfigurationStep( - submittedResourcesSecretName, - resourceStagingServerInternalUri.getOrElse(stagingServerUri), - INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH, - resourceStagingServerInternalSslEnabled, - maybeResourceStagingServerInternalTrustStore, - maybeResourceStagingServerInternalClientCert, - maybeResourceStagingServerInternalTrustStorePassword, - maybeResourceStagingServerInternalTrustStoreType, - submittedDependencyUploader, - mountSecretPlugin) + submittedResourcesSecretName, + resourceStagingServerInternalUri.getOrElse(stagingServerUri), + INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH, + resourceStagingServerInternalSslEnabled, + maybeResourceStagingServerInternalTrustStore, + maybeResourceStagingServerInternalClientCert, + maybeResourceStagingServerInternalTrustStorePassword, + maybeResourceStagingServerInternalTrustStoreType, + submittedDependencyUploader, + mountSecretPlugin) } + Seq(baseInitContainerStep) ++ submittedResourcesInitContainerStep.toSeq } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 60260e2931c29..f2d7d0a29f55a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.ThreadUtils private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -78,7 +78,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION), sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT), configMap, - configMapKey) + configMapKey, + sparkConf) } val mountSmallFilesBootstrap = for { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala index 946fde95ca80b..b051c275852b9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala @@ -16,14 +16,17 @@ */ package org.apache.spark.deploy.k8s +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model._ import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.SparkFunSuite class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAfter { + private val INIT_CONTAINER_IMAGE = "spark-init:latest" private val DOCKER_IMAGE_PULL_POLICY = "IfNotPresent" private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" @@ -40,7 +43,8 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf FILES_DOWNLOAD_PATH, DOWNLOAD_TIMEOUT_MINUTES, INIT_CONTAINER_CONFIG_MAP_NAME, - INIT_CONTAINER_CONFIG_MAP_KEY) + INIT_CONTAINER_CONFIG_MAP_KEY, + new SparkConf()) private val expectedSharedVolumeMap = Map( JARS_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME, FILES_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME) @@ -60,6 +64,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(initContainer.getImagePullPolicy === DOCKER_IMAGE_PULL_POLICY) assert(initContainer.getArgs.asScala.head === INIT_CONTAINER_PROPERTIES_FILE_PATH) } + test("Main: Volume mounts and env") { val returnedPodWithCont = sparkPodInit.bootstrapInitContainerAndVolumes( PodWithDetachedInitContainer( @@ -73,6 +78,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(mainContainer.getEnv.asScala.map(e => (e.getName, e.getValue)).toMap === Map(ENV_MOUNTED_FILES_DIR -> FILES_DOWNLOAD_PATH)) } + test("Pod: Volume Mounts") { val returnedPodWithCont = sparkPodInit.bootstrapInitContainerAndVolumes( PodWithDetachedInitContainer( @@ -93,6 +99,37 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(volumes(2).getEmptyDir === new EmptyDirVolumeSource()) } + test("InitContainer: custom environment variables") { + val sparkConf = new SparkConf() + .set(s"${KUBERNETES_INITCONTAINER_ENV_KEY}env1", "val1") + .set(s"${KUBERNETES_INITCONTAINER_ENV_KEY}env2", "val2") + val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_IMAGE, + DOCKER_IMAGE_PULL_POLICY, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOWNLOAD_TIMEOUT_MINUTES, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + + val returnedPod = initContainerBootstrap.bootstrapInitContainerAndVolumes( + PodWithDetachedInitContainer( + pod = basePod().build(), + initContainer = new Container(), + mainContainer = new ContainerBuilder().withName(MAIN_CONTAINER_NAME).build())) + val initContainer: Container = returnedPod.initContainer + + assert(initContainer.getEnv.size() == 2) + val envVars = initContainer + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envVars("env1") == "val1") + assert(envVars("env2") == "val2") + } + private def basePod(): PodBuilder = { new PodBuilder() .withNewMetadata() From e592c7384ad403c4c250e08ae9159ecd43ff52fb Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Thu, 30 Nov 2017 14:03:31 -0800 Subject: [PATCH 2/4] Use driver/executor env keys for the init-container --- .../k8s/SparkPodInitContainerBootstrap.scala | 13 ++++-- ...tainerConfigurationStepsOrchestrator.scala | 1 + .../k8s/KubernetesClusterManager.scala | 1 + .../SparkPodInitContainerBootstrapSuite.scala | 40 +++++++++++++++++-- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala index 51e11c2ee49d9..d6884d68b6dce 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala @@ -20,8 +20,8 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ -import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.config.KUBERNETES_INITCONTAINER_ENV_KEY +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ /** @@ -52,6 +52,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl( downloadTimeoutMinutes: Long, initContainerConfigMapName: String, initContainerConfigMapKey: String, + sparkRole: String, sparkConf: SparkConf) extends SparkPodInitContainerBootstrap { @@ -67,13 +68,19 @@ private[spark] class SparkPodInitContainerBootstrapImpl( .withMountPath(filesDownloadPath) .build()) - val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(KUBERNETES_INITCONTAINER_ENV_KEY) + val initContainerCustomEnvVarKeyPrefix = sparkRole match { + case SPARK_POD_DRIVER_ROLE => KUBERNETES_DRIVER_ENV_KEY + case SPARK_POD_EXECUTOR_ROLE => "spark.executorEnv." + case _ => throw new SparkException(s"$sparkRole is not a valid Spark pod role") + } + val initContainerCustomEnvVars = sparkConf.getAllWithPrefix(initContainerCustomEnvVarKeyPrefix) .toSeq .map(env => new EnvVarBuilder() .withName(env._1) .withValue(env._2) .build()) + val initContainer = new ContainerBuilder(podWithDetachedInitContainer.initContainer) .withName(s"spark-init") .withImage(initContainerImage) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index 083201e858c7c..48808da193a44 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -99,6 +99,7 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( downloadTimeoutMinutes, initContainerConfigMapName, initContainerConfigMapKey, + SPARK_POD_DRIVER_ROLE, submissionSparkConf) val baseInitContainerStep = new BaseInitContainerConfigurationStep( sparkJars, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index f2d7d0a29f55a..ac16172fe5a8a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -79,6 +79,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT), configMap, configMapKey, + SPARK_POD_EXECUTOR_ROLE, sparkConf) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala index b051c275852b9..d850b2a694f20 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrapSuite.scala @@ -44,6 +44,7 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf DOWNLOAD_TIMEOUT_MINUTES, INIT_CONTAINER_CONFIG_MAP_NAME, INIT_CONTAINER_CONFIG_MAP_KEY, + SPARK_POD_DRIVER_ROLE, new SparkConf()) private val expectedSharedVolumeMap = Map( JARS_DOWNLOAD_PATH -> INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME, @@ -99,10 +100,10 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf assert(volumes(2).getEmptyDir === new EmptyDirVolumeSource()) } - test("InitContainer: custom environment variables") { + test("InitContainer: driver custom environment variables") { val sparkConf = new SparkConf() - .set(s"${KUBERNETES_INITCONTAINER_ENV_KEY}env1", "val1") - .set(s"${KUBERNETES_INITCONTAINER_ENV_KEY}env2", "val2") + .set(s"${KUBERNETES_DRIVER_ENV_KEY}env1", "val1") + .set(s"${KUBERNETES_DRIVER_ENV_KEY}env2", "val2") val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( INIT_CONTAINER_IMAGE, DOCKER_IMAGE_PULL_POLICY, @@ -111,6 +112,39 @@ class SparkPodInitContainerBootstrapSuite extends SparkFunSuite with BeforeAndAf DOWNLOAD_TIMEOUT_MINUTES, INIT_CONTAINER_CONFIG_MAP_NAME, INIT_CONTAINER_CONFIG_MAP_KEY, + SPARK_POD_DRIVER_ROLE, + sparkConf) + + val returnedPod = initContainerBootstrap.bootstrapInitContainerAndVolumes( + PodWithDetachedInitContainer( + pod = basePod().build(), + initContainer = new Container(), + mainContainer = new ContainerBuilder().withName(MAIN_CONTAINER_NAME).build())) + val initContainer: Container = returnedPod.initContainer + + assert(initContainer.getEnv.size() == 2) + val envVars = initContainer + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envVars("env1") == "val1") + assert(envVars("env2") == "val2") + } + + test("InitContainer: executor custom environment variables") { + val sparkConf = new SparkConf() + .set(s"spark.executorEnv.env1", "val1") + .set(s"spark.executorEnv.env2", "val2") + val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( + INIT_CONTAINER_IMAGE, + DOCKER_IMAGE_PULL_POLICY, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOWNLOAD_TIMEOUT_MINUTES, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + SPARK_POD_EXECUTOR_ROLE, sparkConf) val returnedPod = initContainerBootstrap.bootstrapInitContainerAndVolumes( From 6517f8da2aecdd20024c3d22787791a7518fe825 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Fri, 1 Dec 2017 10:59:36 -0800 Subject: [PATCH 3/4] Mount user-specified driver/executor secrets --- .../org/apache/spark/deploy/k8s/config.scala | 2 - ...tainerConfigurationStepsOrchestrator.scala | 19 +++++- .../InitContainerMountSecretsStep.scala | 40 +++++++++++ .../cluster/k8s/ExecutorPodFactory.scala | 16 +++-- .../k8s/KubernetesClusterManager.scala | 6 ++ ...rConfigurationStepsOrchestratorSuite.scala | 26 +++++++ .../InitContainerMountSecretsStepSuite.scala | 56 ++++++++++++++++ .../cluster/k8s/ExecutorPodFactorySuite.scala | 67 +++++++++++++++---- 8 files changed, 207 insertions(+), 25 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index 8636f5825ce26..0e35e04ff5803 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -121,8 +121,6 @@ package object config extends Logging { private[spark] val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." - private[spark] val KUBERNETES_INITCONTAINER_ENV_KEY = "spark.kubernetes.initContainerEnv." - private[spark] val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." private[spark] val KUBERNETES_EXECUTOR_SECRETS_PREFIX = "spark.kubernetes.executor.secrets." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index 48808da193a44..471285d087804 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -17,10 +17,10 @@ package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, SubmittedDependencyUploaderImpl} +import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl} import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl} import org.apache.spark.util.Utils @@ -135,6 +135,19 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( mountSecretPlugin) } - Seq(baseInitContainerStep) ++ submittedResourcesInitContainerStep.toSeq + val driverSecretNamesToMountPaths = ConfigurationUtils.parsePrefixedKeyValuePairs( + submissionSparkConf, + KUBERNETES_DRIVER_SECRETS_PREFIX, + "driver secrets") + val mountSecretsStep = if (driverSecretNamesToMountPaths.nonEmpty) { + val mountSecretsBootstrap = new MountSecretsBootstrapImpl(driverSecretNamesToMountPaths) + Some(new InitContainerMountSecretsStep(mountSecretsBootstrap)) + } else { + None + } + + Seq(baseInitContainerStep) ++ + submittedResourcesInitContainerStep.toSeq ++ + mountSecretsStep.toSeq } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala new file mode 100644 index 0000000000000..a2df3d35b8c84 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStep.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer + +import org.apache.spark.deploy.k8s.submit.MountSecretsBootstrap + +/** + * An init-container configuration step for mounting user-specified secrets onto user-specified + * paths. + * + * @param mountSecretsBootstrap a utility actually handling mounting of the secrets. + */ +private[spark] class InitContainerMountSecretsStep( + mountSecretsBootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep { + + override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = { + val (podWithSecretsMounted, initContainerWithSecretsMounted) = + mountSecretsBootstrap.mountSecrets( + initContainerSpec.podToInitialize, + initContainerSpec.initContainer) + initContainerSpec.copy( + podToInitialize = podWithSecretsMounted, + initContainer = initContainerWithSecretsMounted + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 98a0d879b6a58..73e08b170a4a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import scala.collection.JavaConverters._ -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ @@ -44,6 +44,7 @@ private[spark] class ExecutorPodFactoryImpl( mountSecretsBootstrap: Option[MountSecretsBootstrap], mountSmallFilesBootstrap: Option[MountSmallFilesBootstrap], executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], + executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider) extends ExecutorPodFactory { @@ -82,9 +83,6 @@ private[spark] class ExecutorPodFactoryImpl( private val executorPort = sparkConf.getInt("spark.executor.port", DEFAULT_STATIC_PORT) private val blockmanagerPort = sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) - private val kubernetesDriverPodName = sparkConf - .get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(throw new SparkException("Must specify the driver pod name")) private val executorPodNamePrefix = sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) @@ -234,6 +232,7 @@ private[spark] class ExecutorPodFactoryImpl( bootstrap.mountSmallFilesSecret( withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer) }.getOrElse((withMaybeSecretsMountedPod, withMaybeSecretsMountedContainer)) + val (executorPodWithInitContainer, initBootstrappedExecutorContainer) = executorInitContainerBootstrap.map { bootstrap => val podWithDetachedInitContainer = bootstrap.bootstrapInitContainerAndVolumes( @@ -247,8 +246,13 @@ private[spark] class ExecutorPodFactoryImpl( podWithDetachedInitContainer.initContainer) }.getOrElse(podWithDetachedInitContainer.initContainer) + val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) = + executorInitContainerMountSecretsBootstrap.map { bootstrap => + bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer) + }.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer) + val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( - podWithDetachedInitContainer.pod, resolvedInitContainer) + mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) val resolvedPodWithMountedSecret = executorMountInitContainerSecretPlugin.map { plugin => plugin.addResourceStagingServerSecretVolumeToPod(podWithAttachedInitContainer) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index ac16172fe5a8a..709f3b72b92f5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -97,6 +97,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit } else { None } + val executorInitContainerMountSecretsBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) { + Some(new MountSecretsBootstrapImpl(executorSecretNamesToMountPaths)) + } else { + None + } if (maybeInitContainerConfigMap.isEmpty) { logWarning("The executor's init-container config map was not specified. Executors will" + @@ -135,6 +140,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit mountSecretBootstrap, mountSmallFilesBootstrap, executorInitContainerBootstrap, + executorInitContainerMountSecretsBootstrap, executorInitContainerSecretVolumePlugin, executorLocalDirVolumeProvider) val allocatorExecutor = ThreadUtils diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala index 3810a324f99b5..fad178ab8bfa7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestratorSuite.scala @@ -43,6 +43,9 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map" private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key" private val STAGING_SERVER_URI = "http://localhost:8000" + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/init-container" test ("error thrown if local jars provided without resource staging server") { val sparkConf = new SparkConf(true) @@ -160,4 +163,27 @@ class InitContainerConfigurationStepsOrchestratorSuite extends SparkFunSuite { assert(initSteps.length === 1) assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) } + + test("including step to mount user-specified secrets") { + val sparkConf = new SparkConf(false) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH) + val orchestrator = new InitContainerConfigurationStepsOrchestrator( + NAMESPACE, + APP_RESOURCE_PREFIX, + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + DRIVER_LABELS, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps : Seq[InitContainerConfigurationStep] = + orchestrator.getAllConfigurationSteps() + assert(initSteps.length === 2) + assert(initSteps.head.isInstanceOf[BaseInitContainerConfigurationStep]) + assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep]) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala new file mode 100644 index 0000000000000..9ac48126ca381 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountSecretsStepSuite.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, SecretVolumeUtils} + +class InitContainerMountSecretsStepSuite extends SparkFunSuite { + + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/init-container" + + test("Mounts all given secrets") { + val baseInitContainerSpec = InitContainerSpec( + Map.empty, + Map.empty, + new ContainerBuilder().build(), + new ContainerBuilder().build(), + new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(), + Seq.empty) + val secretNamesToMountPaths = Map( + SECRET_FOO -> SECRET_MOUNT_PATH, + SECRET_BAR -> SECRET_MOUNT_PATH) + + val mountSecretsBootstrap = new MountSecretsBootstrapImpl(secretNamesToMountPaths) + val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap) + val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer( + baseInitContainerSpec) + + val podWithSecretsMounted = configuredInitContainerSpec.podToInitialize + val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer + + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName => + assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName))) + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName => + assert(SecretVolumeUtils.containerHasVolume( + initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 71204a5aa1deb..14cadb4acaa0f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -77,6 +77,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -111,13 +112,14 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") val factory = new ExecutorPodFactoryImpl( - conf, - nodeAffinityExecutorPodModifier, - None, - None, - None, - None, - executorLocalDirVolumeProvider) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -138,6 +140,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -172,6 +175,34 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, Some(initContainerBootstrap), None, + None, + executorLocalDirVolumeProvider) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + + assert(executor.getSpec.getInitContainers.size() === 1) + + checkOwnerReferences(executor, driverPodUid) + } + + test("init-container with secrets mount bootstrap") { + val conf = baseConf.clone() + val initContainerBootstrap = mock(classOf[SparkPodInitContainerBootstrap]) + when(initContainerBootstrap.bootstrapInitContainerAndVolumes( + any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1")) + + val factory = new ExecutorPodFactoryImpl( + conf, + nodeAffinityExecutorPodModifier, + None, + None, + Some(initContainerBootstrap), + Some(secretsBootstrap), + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -180,6 +211,11 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) assert(executor.getSpec.getInitContainers.size() === 1) + assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName + === "secret1-volume") + assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/var/secret1") + checkOwnerReferences(executor, driverPodUid) } @@ -202,6 +238,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -223,6 +260,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(smallFiles), None, None, + None, executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -250,13 +288,14 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") val factory = new ExecutorPodFactoryImpl( - conf, - nodeAffinityExecutorPodModifier, - None, - None, - None, - None, - executorLocalDirVolumeProvider) + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) From a10db338b46a1d9243ce061544c66fee80ce571c Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Fri, 1 Dec 2017 11:17:30 -0800 Subject: [PATCH 4/4] Addressed comments --- .../spark/deploy/k8s/SparkPodInitContainerBootstrap.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala index d6884d68b6dce..4a0e651c50405 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkPodInitContainerBootstrap.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, EnvVarBuilder, PodBuilder, VolumeMount, VolumeMountBuilder} import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.config._