From ae8ecf5028fdedf92387f478ae867e3dd1037f2a Mon Sep 17 00:00:00 2001 From: Jun Gong Date: Tue, 26 Dec 2017 16:46:15 +0800 Subject: [PATCH] Add support for fetching application dependencies from HDFS --- ...DriverConfigurationStepsOrchestrator.scala | 3 +- .../k8s/submit/MountHadoopConfStep.scala | 50 +++++++++++++++++++ ...tainerConfigurationStepsOrchestrator.scala | 15 ++++-- .../InitContainerMountHadoopConfStep.scala | 33 ++++++++++++ .../cluster/k8s/ExecutorPodFactory.scala | 13 +++-- .../k8s/KubernetesClusterManager.scala | 8 ++- .../cluster/k8s/ExecutorPodFactorySuite.scala | 11 ++++ 7 files changed, 125 insertions(+), 8 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MountHadoopConfStep.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountHadoopConfStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index edd27e81f03f..43ec461259e7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -183,7 +183,8 @@ private[spark] class DriverConfigurationStepsOrchestrator( allDriverLabels, initContainerConfigMapName, INIT_CONTAINER_CONFIG_MAP_KEY, - submissionSparkConf) + submissionSparkConf, + hadoopConfDir.isDefined) val initContainerConfigurationSteps = initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps() Some(new InitContainerBootstrapStep(initContainerConfigurationSteps, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MountHadoopConfStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MountHadoopConfStep.scala new file mode 100644 index 000000000000..5977c0bb2c6b --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/MountHadoopConfStep.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder} + +import org.apache.spark.deploy.k8s.constants._ + +/** + * Bootstraps a container with hadoop conf mounted. + */ +private[spark] trait MountHadoopConfStep { + /** + * Mount hadoop conf into the given container. + * + * @param container the container into which volumes are being mounted. + * @return the updated container with hadoop conf volumes mounted. + */ + def mountHadoopConf(container: Container): Container +} + +private[spark] class MountHadoopConfStepImpl extends MountHadoopConfStep { + def mountHadoopConf(container: Container): Container = { + new ContainerBuilder(container) + .addNewVolumeMount() + .withName(HADOOP_FILE_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + } +} + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala index 471285d08780..f4a2554fcbc6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala @@ -20,7 +20,7 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl} +import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountHadoopConfStepImpl, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl} import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl} import org.apache.spark.util.Utils @@ -38,7 +38,8 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( driverLabels: Map[String, String], initContainerConfigMapName: String, initContainerConfigMapKey: String, - submissionSparkConf: SparkConf) { + submissionSparkConf: SparkConf, + hadoopConfEnabled: Boolean = false) { private val submittedResourcesSecretName = s"$kubernetesResourceNamePrefix-init-secret" private val resourceStagingServerUri = submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI) @@ -146,8 +147,16 @@ private[spark] class InitContainerConfigurationStepsOrchestrator( None } + val mountHadoopConfStep = if (hadoopConfEnabled) { + val mountHadoopConfStep = new MountHadoopConfStepImpl + Some(new InitContainerMountHadoopConfStep(mountHadoopConfStep)) + } else { + None + } + Seq(baseInitContainerStep) ++ submittedResourcesInitContainerStep.toSeq ++ - mountSecretsStep.toSeq + mountSecretsStep.toSeq ++ + mountHadoopConfStep.toSeq } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountHadoopConfStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountHadoopConfStep.scala new file mode 100644 index 000000000000..c8278787620d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerMountHadoopConfStep.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer + +import org.apache.spark.deploy.k8s.submit.MountHadoopConfStep + +/** + * An init-container configuration step for mounting hadoop conf files onto HADOOP_CONF_DIR_PATH. + */ +private[spark] class InitContainerMountHadoopConfStep( + mountHadoopConfStep: MountHadoopConfStep) extends InitContainerConfigurationStep { + override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = { + val initContainerWithHadoopConfMounted = mountHadoopConfStep.mountHadoopConf( + initContainerSpec.initContainer) + initContainerSpec.copy( + initContainer = initContainerWithHadoopConfMounted + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 1c7ccba394a3..901a1417e15e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -24,7 +24,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, HadoopConfSparkUserBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap} +import org.apache.spark.deploy.k8s.submit.{InitContainerUtil, MountHadoopConfStep, MountSecretsBootstrap, MountSmallFilesBootstrap} import org.apache.spark.util.Utils // Configures executor pods. Construct one of these with a SparkConf to set up properties that are @@ -47,6 +47,7 @@ private[spark] class ExecutorPodFactoryImpl( executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], + executorInitContainerMountHadoopConfBootstrap: Option[MountHadoopConfStep], executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider, hadoopBootStrap: Option[HadoopConfBootstrap], kerberosBootstrap: Option[KerberosTokenConfBootstrap], @@ -256,10 +257,16 @@ private[spark] class ExecutorPodFactoryImpl( podWithDetachedInitContainer.initContainer) }.getOrElse(podWithDetachedInitContainer.initContainer) + val maybeInitContainerWithHadoopConfMounted = + executorInitContainerMountHadoopConfBootstrap.map { bootstrap => + bootstrap.mountHadoopConf(resolvedInitContainer) + }.getOrElse(resolvedInitContainer) + val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) = executorInitContainerMountSecretsBootstrap.map { bootstrap => - bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer) - }.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer) + bootstrap.mountSecrets(podWithDetachedInitContainer.pod, + maybeInitContainerWithHadoopConfMounted) + }.getOrElse(podWithDetachedInitContainer.pod, maybeInitContainerWithHadoopConfMounted) val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer( mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index bd90766d0700..68ed31ca2bf1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -24,7 +24,7 @@ import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} +import org.apache.spark.deploy.k8s.submit.{MountHadoopConfStepImpl, MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} import org.apache.spark.internal.Logging import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl @@ -135,6 +135,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit } else { None } + val executorInitContainerMountHadoopConfBootstrap = if (hadoopBootStrap.isDefined) { + Some(new MountHadoopConfStepImpl) + } else { + None + } if (maybeInitContainerConfigMap.isEmpty) { logWarning("The executor's init-container config map was not specified. Executors will" + @@ -181,6 +186,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit executorInitContainerBootstrap, executorInitContainerMountSecretsBootstrap, executorInitContainerSecretVolumePlugin, + executorInitContainerMountHadoopConfBootstrap, executorLocalDirVolumeProvider, hadoopBootStrap, kerberosBootstrap, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 47493c827ddb..97ac93bd0195 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -89,6 +89,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -133,6 +134,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -158,6 +160,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -196,6 +199,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(initContainerBootstrap), None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -226,6 +230,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(initContainerBootstrap), Some(secretsBootstrap), None, + None, executorLocalDirVolumeProvider, None, None, @@ -265,6 +270,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -289,6 +295,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -326,6 +333,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, None, None, @@ -362,6 +370,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, Some(hadoopBootsrap), None, @@ -400,6 +409,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, Some(hadoopBootstrap), None, @@ -444,6 +454,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, + None, executorLocalDirVolumeProvider, Some(hadoopBootstrap), Some(kerberosBootstrap),