From c466ef6534f7e34d475d8fb4230a8281b3928f83 Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Mon, 19 Jun 2017 22:43:14 +0800 Subject: [PATCH] Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs --- .../deploy/kubernetes/submit/Client.scala | 41 ++++++++++--------- ...riverInitContainerComponentsProvider.scala | 29 +++++++++---- .../submit/InitContainerBundle.scala | 26 ++++++++++++ .../submit/KubernetesFileUtils.scala | 4 ++ .../kubernetes/submit/ClientV2Suite.scala | 24 ++++------- 5 files changed, 80 insertions(+), 44 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/InitContainerBundle.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index a9699d8c34b4e..ac3a51e74f838 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -156,31 +156,33 @@ private[spark] class Client( .addToContainers(driverContainer) .endSpec() - val maybeSubmittedDependencyUploader = initContainerComponentsProvider - .provideInitContainerSubmittedDependencyUploader(allDriverLabels) - val maybeSubmittedResourceIdentifiers = maybeSubmittedDependencyUploader.map { uploader => + val maybeSubmittedResourceIdentifiers = initContainerComponentsProvider + .provideInitContainerSubmittedDependencyUploader(allDriverLabels) + .map { uploader => SubmittedResources(uploader.uploadJars(), uploader.uploadFiles()) } - val maybeSecretBuilder = initContainerComponentsProvider - .provideSubmittedDependenciesSecretBuilder( - maybeSubmittedResourceIdentifiers.map(_.secrets())) - val maybeSubmittedDependenciesSecret = maybeSecretBuilder.map(_.build()) - val initContainerConfigMap = initContainerComponentsProvider - .provideInitContainerConfigMapBuilder(maybeSubmittedResourceIdentifiers.map(_.ids())) - .build() - val podWithInitContainer = initContainerComponentsProvider - .provideInitContainerBootstrap() - .bootstrapInitContainerAndVolumes(driverContainer.getName, basePod) + val maybeSubmittedDependenciesSecret = initContainerComponentsProvider + .provideSubmittedDependenciesSecretBuilder( + maybeSubmittedResourceIdentifiers.map(_.secrets())) + .map(_.build()) val containerLocalizedFilesResolver = initContainerComponentsProvider - .provideContainerLocalizedFilesResolver() + .provideContainerLocalizedFilesResolver() val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() - val executorInitContainerConfiguration = initContainerComponentsProvider - .provideExecutorInitContainerConfiguration() - val sparkConfWithExecutorInit = executorInitContainerConfiguration - .configureSparkConfForExecutorInitContainer(sparkConf) + val initContainerBundler = initContainerComponentsProvider + .provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()), + resolvedSparkJars ++ resolvedSparkFiles) + + val podWithInitContainer = initContainerBundler.map( + _.sparkPodInitContainerBootstrap + .bootstrapInitContainerAndVolumes(driverContainer.getName, basePod)) + .getOrElse(basePod) + val sparkConfWithExecutorInit = initContainerBundler.map( + _.executorInitContainerConfiguration + .configureSparkConfForExecutorInitContainer(sparkConf)) + .getOrElse(sparkConf) val credentialsMounter = kubernetesCredentialsMounterProvider .getDriverPodKubernetesCredentialsMounter() val credentialsSecret = credentialsMounter.createCredentialsSecret() @@ -224,7 +226,8 @@ private[spark] class Client( .watch(loggingPodStatusWatcher)) { _ => val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) try { - val driverOwnedResources = Seq(initContainerConfigMap) ++ + val driverOwnedResources = initContainerBundler.map( + _.sparkInitContainerConfigMap).toSeq ++ maybeSubmittedDependenciesSecret.toSeq ++ credentialsSecret.toSeq val driverPodOwnerReference = new OwnerReferenceBuilder() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index cfc61e193dcff..cc1837cce6736 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.kubernetes.submit +import io.fabric8.kubernetes.api.model.ConfigMap + import org.apache.spark.{SparkConf, SSLOptions} import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ @@ -30,17 +32,15 @@ import org.apache.spark.util.Utils */ private[spark] trait DriverInitContainerComponentsProvider { - def provideInitContainerConfigMapBuilder( - maybeSubmittedResourceIds: Option[SubmittedResourceIds]) - : SparkInitContainerConfigMapBuilder def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver - def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration def provideInitContainerSubmittedDependencyUploader( driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader] def provideSubmittedDependenciesSecretBuilder( maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets]) : Option[SubmittedDependencySecretBuilder] def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap + def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds], + uris: Iterable[String]): Option[InitContainerBundle] } private[spark] class DriverInitContainerComponentsProviderImpl( @@ -105,9 +105,8 @@ private[spark] class DriverInitContainerComponentsProviderImpl( private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) - override def provideInitContainerConfigMapBuilder( - maybeSubmittedResourceIds: Option[SubmittedResourceIds]) - : SparkInitContainerConfigMapBuilder = { + private def provideInitContainerConfigMap( + maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = { val submittedDependencyConfigPlugin = for { stagingServerUri <- maybeResourceStagingServerUri jarsResourceId <- maybeSubmittedResourceIds.map(_.jarsResourceId) @@ -136,7 +135,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( filesDownloadPath, configMapName, configMapKey, - submittedDependencyConfigPlugin) + submittedDependencyConfigPlugin).build() } override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = { @@ -144,7 +143,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath) } - override def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = { + private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = { new ExecutorInitContainerConfigurationImpl( maybeSecretName, INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH, @@ -202,4 +201,16 @@ private[spark] class DriverInitContainerComponentsProviderImpl( configMapKey, resourceStagingServerSecretPlugin) } + + override def provideInitContainerBundle( + maybeSubmittedResourceIds: Option[SubmittedResourceIds], + uris: Iterable[String]): Option[InitContainerBundle] = { + val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver() + // Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs + if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) { + Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds), + provideInitContainerBootstrap(), + provideExecutorInitContainerConfiguration())) + } else None + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/InitContainerBundle.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/InitContainerBundle.scala new file mode 100644 index 0000000000000..ba44f794d5811 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/InitContainerBundle.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.ConfigMap + +import org.apache.spark.deploy.kubernetes.{SparkPodInitContainerBootstrap} + +case class InitContainerBundle( + sparkInitContainerConfigMap: ConfigMap, + sparkPodInitContainerBootstrap: SparkPodInitContainerBootstrap, + executorInitContainerConfiguration: ExecutorInitContainerConfiguration) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala index 1b0af3fa9fb01..d688bf29808fb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/KubernetesFileUtils.scala @@ -33,6 +33,10 @@ private[spark] object KubernetesFileUtils { filterUriStringsByScheme(uris, _ == "local") } + def getNonContainerLocalFiles(uris: Iterable[String]): Iterable[String] = { + filterUriStringsByScheme(uris, _ != "local") + } + def getOnlySubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = { filterUriStringsByScheme(uris, _ == "file") } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 3945bef5bcfb8..8992a56e20c80 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -123,8 +123,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val CREDENTIALS_SET_CONF = "spark.kubernetes.driverCredentials.provided" private val CREDENTIALS_SET_ANNOTATION = "credentials-set" - @Mock - private var initContainerConfigMapBuilder: SparkInitContainerConfigMapBuilder = _ @Mock private var containerLocalizedFilesResolver: ContainerLocalizedFilesResolver = _ @Mock @@ -173,12 +171,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { }) when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver()) .thenReturn(containerLocalizedFilesResolver) - when(initContainerComponentsProvider.provideExecutorInitContainerConfiguration()) - .thenReturn(executorInitContainerConfiguration) when(submittedDependenciesSecretBuilder.build()) .thenReturn(INIT_CONTAINER_SECRET) - when(initContainerConfigMapBuilder.build()) - .thenReturn(INIT_CONTAINER_CONFIG_MAP) when(kubernetesClient.pods()).thenReturn(podOps) when(podOps.create(any())).thenAnswer(new Answer[Pod] { override def answer(invocation: InvocationOnMock): Pod = { @@ -214,9 +208,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(initContainerComponentsProvider .provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets()))) .thenReturn(Some(submittedDependenciesSecretBuilder)) - when(initContainerComponentsProvider - .provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids()))) - .thenReturn(initContainerConfigMapBuilder) + when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()), + RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES)) + .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, + initContainerBootstrap, executorInitContainerConfiguration))) runAndVerifyDriverPodHasCorrectProperties() val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) verify(kubernetesClient).resourceList(resourceListArgumentCaptor.capture()) @@ -232,8 +227,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { verifyConfigMapWasCreated(createdResources) verify(submittedDependencyUploader).uploadJars() verify(submittedDependencyUploader).uploadFiles() - verify(initContainerComponentsProvider) - .provideInitContainerConfigMapBuilder(Some(SUBMITTED_RESOURCES.ids())) verify(initContainerComponentsProvider) .provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets())) } @@ -250,8 +243,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { verifyConfigMapWasCreated(createdResources) verify(submittedDependencyUploader, times(0)).uploadJars() verify(submittedDependencyUploader, times(0)).uploadFiles() - verify(initContainerComponentsProvider) - .provideInitContainerConfigMapBuilder(None) verify(initContainerComponentsProvider) .provideSubmittedDependenciesSecretBuilder(None) } @@ -321,9 +312,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(initContainerComponentsProvider .provideSubmittedDependenciesSecretBuilder(None)) .thenReturn(None) - when(initContainerComponentsProvider - .provideInitContainerConfigMapBuilder(None)) - .thenReturn(initContainerConfigMapBuilder) + when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++ + RESOLVED_SPARK_FILES)) + .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, + initContainerBootstrap, executorInitContainerConfiguration))) } private def expectationsForNoMountedCredentials(): Unit = {