Skip to content

Commit c5d5a2d

Browse files
committed
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-container is used
1 parent 95f9659 commit c5d5a2d

File tree

11 files changed

+61
-55
lines changed

11 files changed

+61
-55
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/MountSecretsBootstrap.scala

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,26 +24,36 @@ import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBui
2424
private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String, String]) {
2525

2626
/**
27-
* Mounts Kubernetes secrets as secret volumes into the given container in the given pod.
27+
* Add new secret volumes for the secrets specified in secretNamesToMountPaths into the given pod.
2828
*
2929
* @param pod the pod into which the secret volumes are being added.
30-
* @param container the container into which the secret volumes are being mounted.
31-
* @return the updated pod and container with the secrets mounted.
30+
* @return the updated pod with the secret volumes added.
3231
*/
33-
def mountSecrets(pod: Pod, container: Container): (Pod, Container) = {
32+
def addSecretVolumes(pod: Pod): Pod = {
3433
var podBuilder = new PodBuilder(pod)
3534
secretNamesToMountPaths.keys.foreach { name =>
3635
podBuilder = podBuilder
3736
.editOrNewSpec()
3837
.addNewVolume()
39-
.withName(secretVolumeName(name))
40-
.withNewSecret()
41-
.withSecretName(name)
42-
.endSecret()
43-
.endVolume()
38+
.withName(secretVolumeName(name))
39+
.withNewSecret()
40+
.withSecretName(name)
41+
.endSecret()
42+
.endVolume()
4443
.endSpec()
4544
}
4645

46+
podBuilder.build()
47+
}
48+
49+
/**
50+
* Mounts Kubernetes secret volumes of the secrets specified in secretNamesToMountPaths into the
51+
* given container.
52+
*
53+
* @param container the container into which the secret volumes are being mounted.
54+
* @return the updated container with the secrets mounted.
55+
*/
56+
def mountSecrets(container: Container): Container = {
4757
var containerBuilder = new ContainerBuilder(container)
4858
secretNamesToMountPaths.foreach { case (name, path) =>
4959
containerBuilder = containerBuilder
@@ -53,7 +63,7 @@ private[spark] class MountSecretsBootstrap(secretNamesToMountPaths: Map[String,
5363
.endVolumeMount()
5464
}
5565

56-
(podBuilder.build(), containerBuilder.build())
66+
containerBuilder.build()
5767
}
5868

5969
private def secretVolumeName(secretName: String): String = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestrator.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ private[spark] class DriverConfigOrchestrator(
127127
Nil
128128
}
129129

130+
val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
131+
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
132+
} else {
133+
Nil
134+
}
135+
130136
val initContainerBootstrapStep = if (existNonContainerLocalFiles(sparkJars ++ sparkFiles)) {
131137
val orchestrator = new InitContainerConfigOrchestrator(
132138
sparkJars,
@@ -147,19 +153,13 @@ private[spark] class DriverConfigOrchestrator(
147153
Nil
148154
}
149155

150-
val mountSecretsStep = if (secretNamesToMountPaths.nonEmpty) {
151-
Seq(new DriverMountSecretsStep(new MountSecretsBootstrap(secretNamesToMountPaths)))
152-
} else {
153-
Nil
154-
}
155-
156156
Seq(
157157
initialSubmissionStep,
158158
serviceBootstrapStep,
159159
kubernetesCredentialsStep) ++
160160
dependencyResolutionStep ++
161-
initContainerBootstrapStep ++
162-
mountSecretsStep
161+
mountSecretsStep ++
162+
initContainerBootstrapStep
163163
}
164164

165165
private def existNonContainerLocalFiles(files: Seq[String]): Boolean = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ private[spark] class BasicDriverConfigurationStep(
119119
.endEnv()
120120
.addNewEnv()
121121
.withName(ENV_DRIVER_ARGS)
122-
.withValue(appArgs.map(arg => "\"" + arg + "\"").mkString(" "))
122+
.withValue(appArgs.mkString(" "))
123123
.endEnv()
124124
.addNewEnv()
125125
.withName(ENV_DRIVER_BIND_ADDRESS)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStep.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ private[spark] class DriverMountSecretsStep(
2828
bootstrap: MountSecretsBootstrap) extends DriverConfigurationStep {
2929

3030
override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
31-
val (pod, container) = bootstrap.mountSecrets(
32-
driverSpec.driverPod, driverSpec.driverContainer)
31+
val pod = bootstrap.addSecretVolumes(driverSpec.driverPod)
32+
val container = bootstrap.mountSecrets(driverSpec.driverContainer)
3333
driverSpec.copy(
3434
driverPod = pod,
3535
driverContainer = container

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,9 @@ private[spark] class InitContainerMountSecretsStep(
2828
bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep {
2929

3030
override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = {
31-
val (driverPod, initContainer) = bootstrap.mountSecrets(
32-
spec.driverPod,
33-
spec.initContainer)
34-
spec.copy(
35-
driverPod = driverPod,
36-
initContainer = initContainer
37-
)
31+
// Mount the secret volumes given that the volumes have already been added to the driver pod
32+
// when mounting the secrets into the main driver container.
33+
val initContainer = bootstrap.mountSecrets(spec.initContainer)
34+
spec.copy(initContainer = initContainer)
3835
}
3936
}

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private[spark] class ExecutorPodFactory(
214214

215215
val (maybeSecretsMountedPod, maybeSecretsMountedContainer) =
216216
mountSecretsBootstrap.map { bootstrap =>
217-
bootstrap.mountSecrets(executorPod, containerWithLimitCores)
217+
(bootstrap.addSecretVolumes(executorPod), bootstrap.mountSecrets(containerWithLimitCores))
218218
}.getOrElse((executorPod, containerWithLimitCores))
219219

220220
val (bootstrappedPod, bootstrappedContainer) =
@@ -227,7 +227,9 @@ private[spark] class ExecutorPodFactory(
227227

228228
val (pod, mayBeSecretsMountedInitContainer) =
229229
initContainerMountSecretsBootstrap.map { bootstrap =>
230-
bootstrap.mountSecrets(podWithInitContainer.pod, podWithInitContainer.initContainer)
230+
// Mount the secret volumes given that the volumes have already been added to the
231+
// executor pod when mounting the secrets into the main executor container.
232+
(podWithInitContainer.pod, bootstrap.mountSecrets(podWithInitContainer.initContainer))
231233
}.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer))
232234

233235
val bootstrappedPod = KubernetesUtils.appendInitContainer(
Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,23 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.deploy.k8s.submit
17+
package org.apache.spark.deploy.k8s
1818

1919
import scala.collection.JavaConverters._
2020

2121
import io.fabric8.kubernetes.api.model.{Container, Pod}
2222

2323
private[spark] object SecretVolumeUtils {
2424

25-
def podHasVolume(driverPod: Pod, volumeName: String): Boolean = {
26-
driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName)
25+
def podHasVolume(pod: Pod, volumeName: String): Boolean = {
26+
pod.getSpec.getVolumes.asScala.exists { volume =>
27+
volume.getName == volumeName
28+
}
2729
}
2830

29-
def containerHasVolume(
30-
driverContainer: Container,
31-
volumeName: String,
32-
mountPath: String): Boolean = {
33-
driverContainer.getVolumeMounts.asScala.exists(volumeMount =>
34-
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath)
31+
def containerHasVolume(container: Container, volumeName: String, mountPath: String): Boolean = {
32+
container.getVolumeMounts.asScala.exists { volumeMount =>
33+
volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath
34+
}
3535
}
3636
}

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
3333
private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
3434
private val APP_NAME = "spark-test"
3535
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
36-
private val APP_ARGS = Array("arg1", "arg2", "arg 3")
36+
private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
3737
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
3838
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
3939
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
@@ -82,7 +82,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite {
8282
assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar")
8383
assert(envs(ENV_DRIVER_MEMORY) === "256M")
8484
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
85-
assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"")
85+
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2 \"arg 3\"")
8686
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
8787
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")
8888

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.apache.spark.deploy.k8s.submit.steps
1818

1919
import org.apache.spark.{SparkConf, SparkFunSuite}
20-
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
21-
import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, SecretVolumeUtils}
20+
import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
21+
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
2222

2323
class DriverMountSecretsStepSuite extends SparkFunSuite {
2424

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ package org.apache.spark.deploy.k8s.submit.steps.initcontainer
1919
import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}
2020

2121
import org.apache.spark.SparkFunSuite
22-
import org.apache.spark.deploy.k8s.MountSecretsBootstrap
23-
import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils
22+
import org.apache.spark.deploy.k8s.{MountSecretsBootstrap, SecretVolumeUtils}
2423

2524
class InitContainerMountSecretsStepSuite extends SparkFunSuite {
2625

@@ -44,12 +43,8 @@ class InitContainerMountSecretsStepSuite extends SparkFunSuite {
4443
val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap)
4544
val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer(
4645
baseInitContainerSpec)
47-
48-
val podWithSecretsMounted = configuredInitContainerSpec.driverPod
4946
val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer
5047

51-
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
52-
assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName)))
5348
Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName =>
5449
assert(SecretVolumeUtils.containerHasVolume(
5550
initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)))

0 commit comments

Comments
 (0)