Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit ae8ecf5

Browse files
committed
Add support for fetching application dependencies from HDFS
1 parent 6428bb9 commit ae8ecf5

File tree

7 files changed

+125
-8
lines changed

7 files changed

+125
-8
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,8 @@ private[spark] class DriverConfigurationStepsOrchestrator(
183183
allDriverLabels,
184184
initContainerConfigMapName,
185185
INIT_CONTAINER_CONFIG_MAP_KEY,
186-
submissionSparkConf)
186+
submissionSparkConf,
187+
hadoopConfDir.isDefined)
187188
val initContainerConfigurationSteps =
188189
initContainerConfigurationStepsOrchestrator.getAllConfigurationSteps()
189190
Some(new InitContainerBootstrapStep(initContainerConfigurationSteps,
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit
18+
19+
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder}
20+
21+
import org.apache.spark.deploy.k8s.constants._
22+
23+
/**
24+
* Bootstraps a container with hadoop conf mounted.
25+
*/
26+
private[spark] trait MountHadoopConfStep {
27+
/**
28+
* Mount hadoop conf into the given container.
29+
*
30+
* @param container the container into which volumes are being mounted.
31+
* @return the updated container with hadoop conf volumes mounted.
32+
*/
33+
def mountHadoopConf(container: Container): Container
34+
}
35+
36+
private[spark] class MountHadoopConfStepImpl extends MountHadoopConfStep {
37+
def mountHadoopConf(container: Container): Container = {
38+
new ContainerBuilder(container)
39+
.addNewVolumeMount()
40+
.withName(HADOOP_FILE_VOLUME)
41+
.withMountPath(HADOOP_CONF_DIR_PATH)
42+
.endVolumeMount()
43+
.addNewEnv()
44+
.withName(ENV_HADOOP_CONF_DIR)
45+
.withValue(HADOOP_CONF_DIR_PATH)
46+
.endEnv()
47+
.build()
48+
}
49+
}
50+

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/initcontainer/InitContainerConfigurationStepsOrchestrator.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.apache.spark.SparkConf
2020
import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrapImpl}
2121
import org.apache.spark.deploy.k8s.config._
2222
import org.apache.spark.deploy.k8s.constants._
23-
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
23+
import org.apache.spark.deploy.k8s.submit.{KubernetesFileUtils, MountHadoopConfStepImpl, MountSecretsBootstrapImpl, SubmittedDependencyUploaderImpl}
2424
import org.apache.spark.deploy.rest.k8s.{ResourceStagingServerSslOptionsProviderImpl, RetrofitClientFactoryImpl}
2525
import org.apache.spark.util.Utils
2626

@@ -38,7 +38,8 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
3838
driverLabels: Map[String, String],
3939
initContainerConfigMapName: String,
4040
initContainerConfigMapKey: String,
41-
submissionSparkConf: SparkConf) {
41+
submissionSparkConf: SparkConf,
42+
hadoopConfEnabled: Boolean = false) {
4243

4344
private val submittedResourcesSecretName = s"$kubernetesResourceNamePrefix-init-secret"
4445
private val resourceStagingServerUri = submissionSparkConf.get(RESOURCE_STAGING_SERVER_URI)
@@ -146,8 +147,16 @@ private[spark] class InitContainerConfigurationStepsOrchestrator(
146147
None
147148
}
148149

150+
val mountHadoopConfStep = if (hadoopConfEnabled) {
151+
val mountHadoopConfStep = new MountHadoopConfStepImpl
152+
Some(new InitContainerMountHadoopConfStep(mountHadoopConfStep))
153+
} else {
154+
None
155+
}
156+
149157
Seq(baseInitContainerStep) ++
150158
submittedResourcesInitContainerStep.toSeq ++
151-
mountSecretsStep.toSeq
159+
mountSecretsStep.toSeq ++
160+
mountHadoopConfStep.toSeq
152161
}
153162
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer
18+
19+
import org.apache.spark.deploy.k8s.submit.MountHadoopConfStep
20+
21+
/**
22+
* An init-container configuration step for mounting hadoop conf files onto HADOOP_CONF_DIR_PATH.
23+
*/
24+
private[spark] class InitContainerMountHadoopConfStep(
25+
mountHadoopConfStep: MountHadoopConfStep) extends InitContainerConfigurationStep {
26+
override def configureInitContainer(initContainerSpec: InitContainerSpec) : InitContainerSpec = {
27+
val initContainerWithHadoopConfMounted = mountHadoopConfStep.mountHadoopConf(
28+
initContainerSpec.initContainer)
29+
initContainerSpec.copy(
30+
initContainer = initContainerWithHadoopConfMounted
31+
)
32+
}
33+
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.{SparkConf, SparkException}
2424
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, HadoopConfSparkUserBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap}
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
27-
import org.apache.spark.deploy.k8s.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap}
27+
import org.apache.spark.deploy.k8s.submit.{InitContainerUtil, MountHadoopConfStep, MountSecretsBootstrap, MountSmallFilesBootstrap}
2828
import org.apache.spark.util.Utils
2929

3030
// Configures executor pods. Construct one of these with a SparkConf to set up properties that are
@@ -47,6 +47,7 @@ private[spark] class ExecutorPodFactoryImpl(
4747
executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap],
4848
executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap],
4949
executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin],
50+
executorInitContainerMountHadoopConfBootstrap: Option[MountHadoopConfStep],
5051
executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider,
5152
hadoopBootStrap: Option[HadoopConfBootstrap],
5253
kerberosBootstrap: Option[KerberosTokenConfBootstrap],
@@ -256,10 +257,16 @@ private[spark] class ExecutorPodFactoryImpl(
256257
podWithDetachedInitContainer.initContainer)
257258
}.getOrElse(podWithDetachedInitContainer.initContainer)
258259

260+
val maybeInitContainerWithHadoopConfMounted =
261+
executorInitContainerMountHadoopConfBootstrap.map { bootstrap =>
262+
bootstrap.mountHadoopConf(resolvedInitContainer)
263+
}.getOrElse(resolvedInitContainer)
264+
259265
val (mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted) =
260266
executorInitContainerMountSecretsBootstrap.map { bootstrap =>
261-
bootstrap.mountSecrets(podWithDetachedInitContainer.pod, resolvedInitContainer)
262-
}.getOrElse(podWithDetachedInitContainer.pod, resolvedInitContainer)
267+
bootstrap.mountSecrets(podWithDetachedInitContainer.pod,
268+
maybeInitContainerWithHadoopConfMounted)
269+
}.getOrElse(podWithDetachedInitContainer.pod, maybeInitContainerWithHadoopConfMounted)
263270

264271
val podWithAttachedInitContainer = InitContainerUtil.appendInitContainer(
265272
mayBePodWithSecretsMountedToInitContainer, mayBeInitContainerWithSecretsMounted)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.SparkContext
2424
import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl}
2525
import org.apache.spark.deploy.k8s.config._
2626
import org.apache.spark.deploy.k8s.constants._
27-
import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
27+
import org.apache.spark.deploy.k8s.submit.{MountHadoopConfStepImpl, MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl}
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.network.netty.SparkTransportConf
3030
import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl
@@ -135,6 +135,11 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
135135
} else {
136136
None
137137
}
138+
val executorInitContainerMountHadoopConfBootstrap = if (hadoopBootStrap.isDefined) {
139+
Some(new MountHadoopConfStepImpl)
140+
} else {
141+
None
142+
}
138143

139144
if (maybeInitContainerConfigMap.isEmpty) {
140145
logWarning("The executor's init-container config map was not specified. Executors will" +
@@ -181,6 +186,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit
181186
executorInitContainerBootstrap,
182187
executorInitContainerMountSecretsBootstrap,
183188
executorInitContainerSecretVolumePlugin,
189+
executorInitContainerMountHadoopConfBootstrap,
184190
executorLocalDirVolumeProvider,
185191
hadoopBootStrap,
186192
kerberosBootstrap,

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
8989
None,
9090
None,
9191
None,
92+
None,
9293
executorLocalDirVolumeProvider,
9394
None,
9495
None,
@@ -133,6 +134,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
133134
None,
134135
None,
135136
None,
137+
None,
136138
executorLocalDirVolumeProvider,
137139
None,
138140
None,
@@ -158,6 +160,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
158160
None,
159161
None,
160162
None,
163+
None,
161164
executorLocalDirVolumeProvider,
162165
None,
163166
None,
@@ -196,6 +199,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
196199
Some(initContainerBootstrap),
197200
None,
198201
None,
202+
None,
199203
executorLocalDirVolumeProvider,
200204
None,
201205
None,
@@ -226,6 +230,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
226230
Some(initContainerBootstrap),
227231
Some(secretsBootstrap),
228232
None,
233+
None,
229234
executorLocalDirVolumeProvider,
230235
None,
231236
None,
@@ -265,6 +270,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
265270
None,
266271
None,
267272
None,
273+
None,
268274
executorLocalDirVolumeProvider,
269275
None,
270276
None,
@@ -289,6 +295,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
289295
None,
290296
None,
291297
None,
298+
None,
292299
executorLocalDirVolumeProvider,
293300
None,
294301
None,
@@ -326,6 +333,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
326333
None,
327334
None,
328335
None,
336+
None,
329337
executorLocalDirVolumeProvider,
330338
None,
331339
None,
@@ -362,6 +370,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
362370
None,
363371
None,
364372
None,
373+
None,
365374
executorLocalDirVolumeProvider,
366375
Some(hadoopBootsrap),
367376
None,
@@ -400,6 +409,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
400409
None,
401410
None,
402411
None,
412+
None,
403413
executorLocalDirVolumeProvider,
404414
Some(hadoopBootstrap),
405415
None,
@@ -444,6 +454,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef
444454
None,
445455
None,
446456
None,
457+
None,
447458
executorLocalDirVolumeProvider,
448459
Some(hadoopBootstrap),
449460
Some(kerberosBootstrap),

0 commit comments

Comments
 (0)