diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index da34a7e06238a..0bbf7e22fc64c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -54,6 +54,13 @@ private[spark] object Config extends Logging { .checkValues(Set("Always", "Never", "IfNotPresent")) .createWithDefault("IfNotPresent") + val IMAGE_PULL_SECRETS = + ConfigBuilder("spark.kubernetes.container.image.pullSecrets") + .doc("Comma separated list of the Kubernetes secrets used " + + "to access private image registries.") + .stringConf + .createOptional + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 5bc070147d3a8..5b2bb819cdb14 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.deploy.k8s +import io.fabric8.kubernetes.api.model.LocalObjectReference + import org.apache.spark.SparkConf import org.apache.spark.util.Utils @@ -35,6 +37,17 @@ private[spark] object KubernetesUtils { sparkConf.getAllWithPrefix(prefix).toMap } + /** + * Parses comma-separated list of imagePullSecrets into K8s-understandable format + */ + def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = { + imagePullSecrets match { + case Some(secretsCommaSeparated) => + secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList + case None => Nil + } + } + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala index 347c4d2d66826..ce022181a85d6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStep.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit.steps import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} +import io.fabric8.kubernetes.api.model._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ @@ -51,6 +51,8 @@ private[spark] class BasicDriverConfigurationStep( .get(DRIVER_CONTAINER_IMAGE) .getOrElse(throw new SparkException("Must specify the driver container image")) + private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS) + // CPU settings private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1") private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) @@ -132,6 +134,8 @@ private[spark] class BasicDriverConfigurationStep( case _ => driverContainerWithoutArgs.addToArgs(appArgs: _*).build() } + val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets) + val baseDriverPod = new PodBuilder(driverSpec.driverPod) .editOrNewMetadata() .withName(driverPodName) @@ -141,6 +145,7 @@ private[spark] class BasicDriverConfigurationStep( .withNewSpec() .withRestartPolicy("Never") .withNodeSelector(nodeSelector.asJava) + .withImagePullSecrets(parsedImagePullSecrets.asJava) .endSpec() .build() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 98cbd5607da00..8e85f4282d47a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -68,6 +68,7 @@ private[spark] class ExecutorPodFactory( .get(EXECUTOR_CONTAINER_IMAGE) .getOrElse(throw new SparkException("Must specify the executor container image")) private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY) + private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS) private val blockManagerPort = sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) @@ -98,6 +99,8 @@ private[spark] class ExecutorPodFactory( nodeToLocalTaskCount: Map[String, Int]): Pod = { val name = s"$executorPodNamePrefix-exec-$executorId" + val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets) + // hostname must be no longer than 63 characters, so take the last 63 characters of the pod // name as the hostname. This preserves uniqueness since the end of name contains // executorId @@ -193,6 +196,7 @@ private[spark] class ExecutorPodFactory( .withHostname(hostname) .withRestartPolicy("Never") .withNodeSelector(nodeSelector.asJava) + .withImagePullSecrets(parsedImagePullSecrets.asJava) .endSpec() .build() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala new file mode 100644 index 0000000000000..cf41b22e241af --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesUtilsTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.LocalObjectReference + +import org.apache.spark.SparkFunSuite + +class KubernetesUtilsTest extends SparkFunSuite { + + test("testParseImagePullSecrets") { + val noSecrets = KubernetesUtils.parseImagePullSecrets(None) + assert(noSecrets === Nil) + + val oneSecret = KubernetesUtils.parseImagePullSecrets(Some("imagePullSecret")) + assert(oneSecret === new LocalObjectReference("imagePullSecret") :: Nil) + + val commaSeparatedSecrets = KubernetesUtils.parseImagePullSecrets(Some("s1, s2 , s3,s4")) + assert(commaSeparatedSecrets.map(_.getName) === "s1" :: "s2" :: "s3" :: "s4" :: Nil) + } + +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala index ce068531c7673..580e80f30966b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala @@ -51,6 +51,7 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite { .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1") .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2") + .set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2") val submissionStep = new BasicDriverConfigurationStep( APP_ID, @@ -103,7 +104,12 @@ class BasicDriverConfigurationStepSuite extends SparkFunSuite { CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE, SPARK_APP_NAME_ANNOTATION -> APP_NAME) assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations) - assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never") + + val driverPodSpec = preparedDriverSpec.driverPod.getSpec + assert(driverPodSpec.getRestartPolicy === "Never") + assert(driverPodSpec.getImagePullSecrets.size() === 2) + assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1") + assert(driverPodSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2") val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap val expectedSparkConf = Map( diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 7755b93835047..318efce486b44 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -33,6 +33,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef private val driverPodUid: String = "driver-uid" private val executorPrefix: String = "base" private val executorImage: String = "executor-image" + private val imagePullSecrets: String = "imagePullSecret1, imagePullSecret2" private val driverPod = new PodBuilder() .withNewMetadata() .withName(driverPodName) @@ -54,6 +55,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(CONTAINER_IMAGE, executorImage) .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true) + .set(IMAGE_PULL_SECRETS, imagePullSecrets) } test("basic executor pod has reasonable defaults") { @@ -74,6 +76,9 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1) assert(executor.getSpec.getContainers.get(0).getResources .getLimits.get("memory").getAmount === "1408Mi") + assert(executor.getSpec.getImagePullSecrets.size() === 2) + assert(executor.getSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1") + assert(executor.getSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2") // The pod has no node selector, volumes. assert(executor.getSpec.getNodeSelector.isEmpty)