From 1db618807377157aca55de2694441ea09f244e2d Mon Sep 17 00:00:00 2001 From: foxish Date: Thu, 14 Sep 2017 09:48:58 -0700 Subject: [PATCH 01/18] Unit test for executorpodfactory --- .../kubernetes/ExecutorPodFactorySuite.scala | 227 ++++++++++++++++++ 1 file changed, 227 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala new file mode 100644 index 0000000000000..cd81b11701dec --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.kubernetes + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.MockitoAnnotations +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.{constants, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl + +class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { + private val driverPodName: String = "driver-pod" + private val driverPodUid: String = "driver-uid" + private val driverUrl: String = "driver-url" + private val executorPrefix: String = "base" + private val executorImage: String = "executor-image" + private val driverPod = new PodBuilder() + .withNewMetadata() + .withName(driverPodName) + .withUid(driverPodUid) + .endMetadata() + .withNewSpec() + .withNodeName("some-node") + .endSpec() + .withNewStatus() + .withHostIP("192.168.99.100") + .endStatus() + .build() + private var baseConf: SparkConf = _ + private var sc: SparkContext = _ + + before { + SparkContext.clearActiveContext() + MockitoAnnotations.initMocks(this) + baseConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) + .set(EXECUTOR_DOCKER_IMAGE, executorImage) + sc = new SparkContext("local", "test") + } + private var kubernetesClient: KubernetesClient = _ + + test("basic executor pod has reasonable defaults") { + val factory = new ExecutorPodFactoryImpl(baseConf, + NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + val executor = factory.createExecutorPod("1", "dummy", "dummy", + Seq[(String, String)](), driverPod, Map[String, Int]()) + + // The executor pod name and default labels. + assert(executor.getMetadata.getName == s"$executorPrefix-exec-1") + assert(executor.getMetadata.getLabels.size() == 3) + + // There is exactly 1 container with no volume mounts and default memory limits. + // Default memory limit is 1024M + 384M (minimum overhead constant). + assert(executor.getSpec.getContainers.size() == 1) + assert(executor.getSpec.getContainers.get(0).getImage == executorImage) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 0) + assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() == 1) + assert(executor.getSpec.getContainers.get(0).getResources. + getLimits.get("memory").getAmount == "1408Mi") + + // The pod has no node selector, volumes. + assert(executor.getSpec.getNodeSelector.size() == 0) + assert(executor.getSpec.getVolumes.size() == 0) + + checkEnv(executor, Set()) + checkOwnerReferences(executor, driverPodUid) + } + + test("executor pod hostnames get truncated to 63 characters") { + val conf = baseConf.clone() + conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, + "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") + + val factory = new ExecutorPodFactoryImpl(conf, + NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + val executor = factory.createExecutorPod("1", + "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getHostname.length == 63) + } + + test("secrets get mounted") { + val conf = baseConf.clone() + + val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1")) + val factory = new ExecutorPodFactoryImpl(conf, + NodeAffinityExecutorPodModifierImpl, Some(secretsBootstrap), None, None, None, None) + val executor = factory.createExecutorPod("1", + "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getContainers.size() == 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName == "secret1-volume") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0). + getMountPath == "/var/secret1") + + // check volume mounted. + assert(executor.getSpec.getVolumes.size() == 1) + assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName == "secret1") + + checkOwnerReferences(executor, driverPodUid) + } + + test("init-container bootstrap step adds an init container") { + val conf = baseConf.clone() + + val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl("init-image", + "IfNotPresent", "/some/path/", "some/other/path", 10, "config-map-name", "config-map-key") + val factory = new ExecutorPodFactoryImpl(conf, + NodeAffinityExecutorPodModifierImpl, None, None, Some(initContainerBootstrap), None, None) + val executor = factory.createExecutorPod("1", + "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getMetadata.getAnnotations.size() == 1) + assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION)) + checkOwnerReferences(executor, driverPodUid) + } + + test("the shuffle-service adds a volume mount") { + val conf = baseConf.clone() + conf.set(KUBERNETES_SHUFFLE_LABELS, "label=value") + conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") + + val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( + SparkTransportConf.fromSparkConf(conf, "shuffle"), + sc.env.securityManager, + sc.env.securityManager.isAuthenticationEnabled()) + val shuffleManager = new KubernetesExternalShuffleManagerImpl(conf, + kubernetesClient, kubernetesExternalShuffleClient) + val factory = new ExecutorPodFactoryImpl(conf, + NodeAffinityExecutorPodModifierImpl, None, None, None, None, Some(shuffleManager)) + val executor = factory.createExecutorPod("1", + "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + + assert(executor.getSpec.getContainers.size() == 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName == "0-tmp") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0). + getMountPath == "/tmp") + checkOwnerReferences(executor, driverPodUid) + } + + test("Small-files add a secret & secret volume mount to the container") { + val conf = baseConf.clone() + val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") + + val factory = new ExecutorPodFactoryImpl(conf, + NodeAffinityExecutorPodModifierImpl, None, Some(smallFiles), None, None, None) + val executor = factory.createExecutorPod("1", + "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + + assert(executor.getSpec.getContainers.size() == 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getName == "submitted-files") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath == "/var/secret1") + + assert(executor.getSpec.getVolumes.size() == 1) + assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName == "secret1") + + checkOwnerReferences(executor, driverPodUid) + checkEnv(executor, Set()) + } + + test("classpath and extra java options get translated into environment variables") { + val conf = baseConf.clone() + conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") + conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") + + val factory = new ExecutorPodFactoryImpl(conf, + NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + val executor = factory.createExecutorPod("1", + "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + + checkEnv(executor, Set("SPARK_JAVA_OPT_0", "SPARK_EXECUTOR_EXTRA_CLASSPATH", "qux")) + checkOwnerReferences(executor, driverPodUid) + } + + // There is always exactly one controller reference, and it points to the driver pod. + private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { + assert(executor.getMetadata.getOwnerReferences.size() == 1) + assert(executor.getMetadata.getOwnerReferences.get(0).getUid == driverPodUid) + assert(executor.getMetadata.getOwnerReferences.get(0).getController == true) + } + + // Check that the expected environment variables are present. + private def checkEnv(executor: Pod, additionalEnvVars: Set[String]): Unit = { + val defaultEnvs = Set(constants.ENV_EXECUTOR_ID, + constants.ENV_DRIVER_URL, constants.ENV_EXECUTOR_CORES, + constants.ENV_EXECUTOR_MEMORY, constants.ENV_APPLICATION_ID, + constants.ENV_MOUNTED_CLASSPATH, constants.ENV_EXECUTOR_POD_IP, + constants.ENV_EXECUTOR_PORT) ++ additionalEnvVars + + assert(executor.getSpec.getContainers.size() == 1) + assert(executor.getSpec.getContainers.get(0).getEnv().size() == defaultEnvs.size) + val setEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map { + x => x.getName + }.toSet + assert(defaultEnvs == setEnvs) + } +} \ No newline at end of file From 810c4c4cfce06ea92f5446fd0981dd0a2892b113 Mon Sep 17 00:00:00 2001 From: foxish Date: Thu, 14 Sep 2017 18:00:14 -0700 Subject: [PATCH 02/18] Fix test --- .../scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index cd81b11701dec..1a890cc2118d6 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -143,6 +143,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val conf = baseConf.clone() conf.set(KUBERNETES_SHUFFLE_LABELS, "label=value") conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") + conf.set(KUBERNETES_SHUFFLE_DIR, "/tmp") val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( SparkTransportConf.fromSparkConf(conf, "shuffle"), @@ -185,7 +186,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName == "secret1") checkOwnerReferences(executor, driverPodUid) - checkEnv(executor, Set()) + checkEnv(executor, Set("SPARK_MOUNTED_FILES_FROM_SECRET_DIR")) } test("classpath and extra java options get translated into environment variables") { From 478ddc853aea307a8f56bc045c82145e15096a66 Mon Sep 17 00:00:00 2001 From: foxish Date: Wed, 20 Sep 2017 03:07:32 -0700 Subject: [PATCH 03/18] Indentation fix --- .../kubernetes/ExecutorPodFactorySuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index 1a890cc2118d6..fdb8aca2ebfab 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -38,15 +38,15 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { private val executorImage: String = "executor-image" private val driverPod = new PodBuilder() .withNewMetadata() - .withName(driverPodName) - .withUid(driverPodUid) - .endMetadata() + .withName(driverPodName) + .withUid(driverPodUid) + .endMetadata() .withNewSpec() - .withNodeName("some-node") - .endSpec() + .withNodeName("some-node") + .endSpec() .withNewStatus() - .withHostIP("192.168.99.100") - .endStatus() + .withHostIP("192.168.99.100") + .endStatus() .build() private var baseConf: SparkConf = _ private var sc: SparkContext = _ From e702cbd67148908809609543c8b66d24a96bb182 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 26 Sep 2017 02:49:31 -0700 Subject: [PATCH 04/18] Fix isEmpty and split between lines --- .../kubernetes/ExecutorPodFactorySuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index fdb8aca2ebfab..865068ff0eae2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -65,8 +65,8 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { test("basic executor pod has reasonable defaults") { val factory = new ExecutorPodFactoryImpl(baseConf, NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) - val executor = factory.createExecutorPod("1", "dummy", "dummy", - Seq[(String, String)](), driverPod, Map[String, Int]()) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) // The executor pod name and default labels. assert(executor.getMetadata.getName == s"$executorPrefix-exec-1") @@ -76,14 +76,14 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { // Default memory limit is 1024M + 384M (minimum overhead constant). assert(executor.getSpec.getContainers.size() == 1) assert(executor.getSpec.getContainers.get(0).getImage == executorImage) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 0) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty) assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() == 1) - assert(executor.getSpec.getContainers.get(0).getResources. - getLimits.get("memory").getAmount == "1408Mi") + assert(executor.getSpec.getContainers.get(0).getResources + .getLimits.get("memory").getAmount == "1408Mi") // The pod has no node selector, volumes. - assert(executor.getSpec.getNodeSelector.size() == 0) - assert(executor.getSpec.getVolumes.size() == 0) + assert(executor.getSpec.getNodeSelector.isEmpty) + assert(executor.getSpec.getVolumes.isEmpty) checkEnv(executor, Set()) checkOwnerReferences(executor, driverPodUid) @@ -114,8 +114,8 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { assert(executor.getSpec.getContainers.size() == 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName == "secret1-volume") - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0). - getMountPath == "/var/secret1") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath == "/var/secret1") // check volume mounted. assert(executor.getSpec.getVolumes.size() == 1) @@ -160,8 +160,8 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { assert(executor.getSpec.getContainers.size() == 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName == "0-tmp") - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0). - getMountPath == "/tmp") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath == "/tmp") checkOwnerReferences(executor, driverPodUid) } From fa330ede47c060491992a5ba552a80e882dabad2 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 26 Sep 2017 02:54:58 -0700 Subject: [PATCH 05/18] Address issues with multi-line code fragments --- .../kubernetes/ExecutorPodFactorySuite.scala | 97 +++++++++++++------ 1 file changed, 67 insertions(+), 30 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index 865068ff0eae2..dc539c4ada563 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -63,8 +63,14 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { private var kubernetesClient: KubernetesClient = _ test("basic executor pod has reasonable defaults") { - val factory = new ExecutorPodFactoryImpl(baseConf, - NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + val factory = new ExecutorPodFactoryImpl( + baseConf, + NodeAffinityExecutorPodModifierImpl, + None, + None, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -94,10 +100,10 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") - val factory = new ExecutorPodFactoryImpl(conf, - NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) - val executor = factory.createExecutorPod("1", - "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + val factory = new ExecutorPodFactoryImpl( + conf, NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getHostname.length == 63) } @@ -106,10 +112,16 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val conf = baseConf.clone() val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1")) - val factory = new ExecutorPodFactoryImpl(conf, - NodeAffinityExecutorPodModifierImpl, Some(secretsBootstrap), None, None, None, None) - val executor = factory.createExecutorPod("1", - "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + val factory = new ExecutorPodFactoryImpl( + conf, + NodeAffinityExecutorPodModifierImpl, + Some(secretsBootstrap), + None, + None, + None, + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getContainers.size() == 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) @@ -127,12 +139,25 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { test("init-container bootstrap step adds an init container") { val conf = baseConf.clone() - val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl("init-image", - "IfNotPresent", "/some/path/", "some/other/path", 10, "config-map-name", "config-map-key") - val factory = new ExecutorPodFactoryImpl(conf, - NodeAffinityExecutorPodModifierImpl, None, None, Some(initContainerBootstrap), None, None) - val executor = factory.createExecutorPod("1", - "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( + "init-image", + "IfNotPresent", + "/some/path/", + "some/other/path", + 10, + "config-map-name", + "config-map-key") + + val factory = new ExecutorPodFactoryImpl( + conf, + NodeAffinityExecutorPodModifierImpl, + None, + None, + Some(initContainerBootstrap), + None, + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getMetadata.getAnnotations.size() == 1) assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION)) @@ -149,12 +174,18 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { SparkTransportConf.fromSparkConf(conf, "shuffle"), sc.env.securityManager, sc.env.securityManager.isAuthenticationEnabled()) - val shuffleManager = new KubernetesExternalShuffleManagerImpl(conf, - kubernetesClient, kubernetesExternalShuffleClient) - val factory = new ExecutorPodFactoryImpl(conf, - NodeAffinityExecutorPodModifierImpl, None, None, None, None, Some(shuffleManager)) - val executor = factory.createExecutorPod("1", - "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + val shuffleManager = new KubernetesExternalShuffleManagerImpl( + conf, kubernetesClient, kubernetesExternalShuffleClient) + val factory = new ExecutorPodFactoryImpl( + conf, + NodeAffinityExecutorPodModifierImpl, + None, + None, + None, + None, + Some(shuffleManager)) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getContainers.size() == 1) @@ -169,10 +200,16 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val conf = baseConf.clone() val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") - val factory = new ExecutorPodFactoryImpl(conf, - NodeAffinityExecutorPodModifierImpl, None, Some(smallFiles), None, None, None) - val executor = factory.createExecutorPod("1", - "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + val factory = new ExecutorPodFactoryImpl( + conf, + NodeAffinityExecutorPodModifierImpl, + None, + Some(smallFiles), + None, + None, + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getContainers.size() == 1) @@ -194,10 +231,10 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") - val factory = new ExecutorPodFactoryImpl(conf, - NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) - val executor = factory.createExecutorPod("1", - "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + val factory = new ExecutorPodFactoryImpl( + conf, NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) checkEnv(executor, Set("SPARK_JAVA_OPT_0", "SPARK_EXECUTOR_EXTRA_CLASSPATH", "qux")) checkOwnerReferences(executor, driverPodUid) From 6ddb01e0503c2e93651a6677e4562ebcd378f238 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 26 Sep 2017 02:57:15 -0700 Subject: [PATCH 06/18] Replace == with === --- .../kubernetes/ExecutorPodFactorySuite.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index dc539c4ada563..b08d6bb3029c1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -75,17 +75,17 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) // The executor pod name and default labels. - assert(executor.getMetadata.getName == s"$executorPrefix-exec-1") - assert(executor.getMetadata.getLabels.size() == 3) + assert(executor.getMetadata.getName === s"$executorPrefix-exec-1") + assert(executor.getMetadata.getLabels.size() === 3) // There is exactly 1 container with no volume mounts and default memory limits. // Default memory limit is 1024M + 384M (minimum overhead constant). - assert(executor.getSpec.getContainers.size() == 1) - assert(executor.getSpec.getContainers.get(0).getImage == executorImage) + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getImage === executorImage) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.isEmpty) - assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() == 1) + assert(executor.getSpec.getContainers.get(0).getResources.getLimits.size() === 1) assert(executor.getSpec.getContainers.get(0).getResources - .getLimits.get("memory").getAmount == "1408Mi") + .getLimits.get("memory").getAmount === "1408Mi") // The pod has no node selector, volumes. assert(executor.getSpec.getNodeSelector.isEmpty) @@ -105,7 +105,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - assert(executor.getSpec.getHostname.length == 63) + assert(executor.getSpec.getHostname.length === 63) } test("secrets get mounted") { @@ -123,15 +123,15 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - assert(executor.getSpec.getContainers.size() == 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName == "secret1-volume") + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "secret1-volume") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) - .getMountPath == "/var/secret1") + .getMountPath === "/var/secret1") // check volume mounted. - assert(executor.getSpec.getVolumes.size() == 1) - assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName == "secret1") + assert(executor.getSpec.getVolumes.size() === 1) + assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1") checkOwnerReferences(executor, driverPodUid) } @@ -159,7 +159,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - assert(executor.getMetadata.getAnnotations.size() == 1) + assert(executor.getMetadata.getAnnotations.size() === 1) assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION)) checkOwnerReferences(executor, driverPodUid) } @@ -188,11 +188,11 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - assert(executor.getSpec.getContainers.size() == 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName == "0-tmp") + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "0-tmp") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) - .getMountPath == "/tmp") + .getMountPath === "/tmp") checkOwnerReferences(executor, driverPodUid) } @@ -212,15 +212,15 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) - assert(executor.getSpec.getContainers.size() == 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() == 1) + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) - .getName == "submitted-files") + .getName === "submitted-files") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) - .getMountPath == "/var/secret1") + .getMountPath === "/var/secret1") - assert(executor.getSpec.getVolumes.size() == 1) - assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName == "secret1") + assert(executor.getSpec.getVolumes.size() === 1) + assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1") checkOwnerReferences(executor, driverPodUid) checkEnv(executor, Set("SPARK_MOUNTED_FILES_FROM_SECRET_DIR")) @@ -242,9 +242,9 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { - assert(executor.getMetadata.getOwnerReferences.size() == 1) - assert(executor.getMetadata.getOwnerReferences.get(0).getUid == driverPodUid) - assert(executor.getMetadata.getOwnerReferences.get(0).getController == true) + assert(executor.getMetadata.getOwnerReferences.size() === 1) + assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid) + assert(executor.getMetadata.getOwnerReferences.get(0).getController === true) } // Check that the expected environment variables are present. @@ -255,11 +255,11 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { constants.ENV_MOUNTED_CLASSPATH, constants.ENV_EXECUTOR_POD_IP, constants.ENV_EXECUTOR_PORT) ++ additionalEnvVars - assert(executor.getSpec.getContainers.size() == 1) - assert(executor.getSpec.getContainers.get(0).getEnv().size() == defaultEnvs.size) + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getEnv().size() === defaultEnvs.size) val setEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map { x => x.getName }.toSet - assert(defaultEnvs == setEnvs) + assert(defaultEnvs === setEnvs) } } \ No newline at end of file From 13a2dd5608a1f180b40860293df3fb2ad926cf97 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Wed, 27 Sep 2017 18:49:12 -0700 Subject: [PATCH 07/18] mock shuffleManager --- .../kubernetes/ExecutorPodFactorySuite.scala | 33 +++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index b08d6bb3029c1..726d4308e5973 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -20,8 +20,15 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient -import org.mockito.MockitoAnnotations +import io.fabric8.kubernetes.api.model.{Pod, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} + import org.scalatest.BeforeAndAfter +import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.Matchers.{any, eq => mockitoEq} +import org.mockito.Mockito.{doNothing, never, times, verify, when, mock} +import org.scalatest.mock.MockitoSugar._ + +import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.deploy.kubernetes.{constants, SparkPodInitContainerBootstrapImpl} @@ -49,7 +56,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { .endStatus() .build() private var baseConf: SparkConf = _ - private var sc: SparkContext = _ + //private var sc: SparkContext = mock(classOf[SparkContext]) before { SparkContext.clearActiveContext() @@ -58,7 +65,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(EXECUTOR_DOCKER_IMAGE, executorImage) - sc = new SparkContext("local", "test") + //sc = new SparkContext("local", "test") } private var kubernetesClient: KubernetesClient = _ @@ -170,12 +177,32 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") conf.set(KUBERNETES_SHUFFLE_DIR, "/tmp") +/* val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( SparkTransportConf.fromSparkConf(conf, "shuffle"), sc.env.securityManager, sc.env.securityManager.isAuthenticationEnabled()) val shuffleManager = new KubernetesExternalShuffleManagerImpl( conf, kubernetesClient, kubernetesExternalShuffleClient) +*/ + + val shuffleManager = mock(classOf[KubernetesExternalShuffleManager]) + when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({ + val shuffleDirs = Seq("/tmp") + shuffleDirs.zipWithIndex.map { case (shuffleDir, shuffleDirIndex) => + val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" + val volume = new VolumeBuilder() + .withName(volumeName) + .withNewHostPath(shuffleDir) + .build() + val volumeMount = new VolumeMountBuilder() + .withName(volumeName) + .withMountPath(shuffleDir) + .build() + (volume, volumeMount) + } + }) + val factory = new ExecutorPodFactoryImpl( conf, NodeAffinityExecutorPodModifierImpl, From 3f769cad31638c332bbc2a8f91eb9264e83bbffb Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 14:31:02 -0700 Subject: [PATCH 08/18] .kubernetes. => .k8s. --- .../cluster/kubernetes/ExecutorPodFactorySuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala index 726d4308e5973..bd9a93b8c9a98 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala @@ -31,11 +31,11 @@ import org.scalatest.mock.MockitoSugar._ import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.deploy.kubernetes.{constants, SparkPodInitContainerBootstrapImpl} -import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl +import org.apache.spark.deploy.k8s.{constants, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { private val driverPodName: String = "driver-pod" @@ -289,4 +289,4 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { }.toSet assert(defaultEnvs === setEnvs) } -} \ No newline at end of file +} From 6171267768af5aa30bd196ec30f58056a6aab1d1 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 14:32:26 -0700 Subject: [PATCH 09/18] move to k8s subdir --- .../cluster/{kubernetes => k8s}/ExecutorPodFactorySuite.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/{kubernetes => k8s}/ExecutorPodFactorySuite.scala (100%) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala similarity index 100% rename from resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/kubernetes/ExecutorPodFactorySuite.scala rename to resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala From 47ece8763386daee051124bc9e5e070a7161436a Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 14:39:31 -0700 Subject: [PATCH 10/18] fix package clause to k8s --- .../spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index bd9a93b8c9a98..ae556a53828a5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.scheduler.cluster.kubernetes +package org.apache.spark.scheduler.cluster.k8s import scala.collection.JavaConverters._ From 67a99d9a3cc83b655c9a1cb981f914574606e417 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 15:21:29 -0700 Subject: [PATCH 11/18] mock nodeAffinityExecutorPodModifier --- .../cluster/k8s/ExecutorPodFactorySuite.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index ae556a53828a5..3c64bd52102df 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -56,7 +56,11 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { .endStatus() .build() private var baseConf: SparkConf = _ - //private var sc: SparkContext = mock(classOf[SparkContext]) + + private val nodeAffinityExecutorPodModifier = mock(classOf[NodeAffinityExecutorPodModifier]) + when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( + any(classOf[Pod]), + any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) before { SparkContext.clearActiveContext() @@ -65,14 +69,13 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(EXECUTOR_DOCKER_IMAGE, executorImage) - //sc = new SparkContext("local", "test") } private var kubernetesClient: KubernetesClient = _ test("basic executor pod has reasonable defaults") { val factory = new ExecutorPodFactoryImpl( baseConf, - NodeAffinityExecutorPodModifierImpl, + nodeAffinityExecutorPodModifier, None, None, None, @@ -108,7 +111,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") val factory = new ExecutorPodFactoryImpl( - conf, NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + conf, nodeAffinityExecutorPodModifier, None, None, None, None, None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -121,7 +124,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val secretsBootstrap = new MountSecretsBootstrapImpl(Map("secret1" -> "/var/secret1")) val factory = new ExecutorPodFactoryImpl( conf, - NodeAffinityExecutorPodModifierImpl, + nodeAffinityExecutorPodModifier, Some(secretsBootstrap), None, None, @@ -157,7 +160,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val factory = new ExecutorPodFactoryImpl( conf, - NodeAffinityExecutorPodModifierImpl, + nodeAffinityExecutorPodModifier, None, None, Some(initContainerBootstrap), @@ -205,7 +208,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val factory = new ExecutorPodFactoryImpl( conf, - NodeAffinityExecutorPodModifierImpl, + nodeAffinityExecutorPodModifier, None, None, None, @@ -229,7 +232,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val factory = new ExecutorPodFactoryImpl( conf, - NodeAffinityExecutorPodModifierImpl, + nodeAffinityExecutorPodModifier, None, Some(smallFiles), None, @@ -259,7 +262,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") val factory = new ExecutorPodFactoryImpl( - conf, NodeAffinityExecutorPodModifierImpl, None, None, None, None, None) + conf, nodeAffinityExecutorPodModifier, None, None, None, None, None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) From 4f0beb57aedf48544edb9f17b26e3eb60e93a6dd Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 16:07:08 -0700 Subject: [PATCH 12/18] remove commented code --- .../scheduler/cluster/k8s/ExecutorPodFactorySuite.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 3c64bd52102df..ab0ef446c9b4d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -180,15 +180,6 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { conf.set(KUBERNETES_SHUFFLE_NAMESPACE, "default") conf.set(KUBERNETES_SHUFFLE_DIR, "/tmp") -/* - val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClientImpl( - SparkTransportConf.fromSparkConf(conf, "shuffle"), - sc.env.securityManager, - sc.env.securityManager.isAuthenticationEnabled()) - val shuffleManager = new KubernetesExternalShuffleManagerImpl( - conf, kubernetesClient, kubernetesExternalShuffleClient) -*/ - val shuffleManager = mock(classOf[KubernetesExternalShuffleManager]) when(shuffleManager.getExecutorShuffleDirVolumesWithMounts).thenReturn({ val shuffleDirs = Seq("/tmp") From 7014961b20c6955c9ea2ba5c63efb658b1cdcf85 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 16:25:49 -0700 Subject: [PATCH 13/18] move when clause to before{} block --- .../scheduler/cluster/k8s/ExecutorPodFactorySuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index ab0ef446c9b4d..4eb24117e7880 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -58,17 +58,16 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { private var baseConf: SparkConf = _ private val nodeAffinityExecutorPodModifier = mock(classOf[NodeAffinityExecutorPodModifier]) - when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( - any(classOf[Pod]), - any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) before { - SparkContext.clearActiveContext() MockitoAnnotations.initMocks(this) baseConf = new SparkConf() .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(EXECUTOR_DOCKER_IMAGE, executorImage) + when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( + any(classOf[Pod]), + any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) } private var kubernetesClient: KubernetesClient = _ From 8627cd2737dfa2d219bf2ac067cad5fde3505681 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 28 Sep 2017 19:09:11 -0700 Subject: [PATCH 14/18] mock initContainerBootstrap, smallFiles --- .../cluster/k8s/ExecutorPodFactorySuite.scala | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 4eb24117e7880..09fe08ec2165d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -26,6 +26,8 @@ import org.scalatest.BeforeAndAfter import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Matchers.{any, eq => mockitoEq} import org.mockito.Mockito.{doNothing, never, times, verify, when, mock} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.mock.MockitoSugar._ import org.apache.commons.io.FilenameUtils @@ -33,9 +35,9 @@ import org.apache.commons.io.FilenameUtils import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl -import org.apache.spark.deploy.k8s.{constants, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.k8s.{constants, SparkPodInitContainerBootstrapImpl, SparkPodInitContainerBootstrap, PodWithDetachedInitContainer} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} +import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl, MountSmallFilesBootstrap} class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { private val driverPodName: String = "driver-pod" @@ -56,7 +58,6 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { .endStatus() .build() private var baseConf: SparkConf = _ - private val nodeAffinityExecutorPodModifier = mock(classOf[NodeAffinityExecutorPodModifier]) before { @@ -148,6 +149,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { test("init-container bootstrap step adds an init container") { val conf = baseConf.clone() +/* val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( "init-image", "IfNotPresent", @@ -156,6 +158,11 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { 10, "config-map-name", "config-map-key") +*/ + + val initContainerBootstrap = mock(classOf[SparkPodInitContainerBootstrap]) + when(initContainerBootstrap.bootstrapInitContainerAndVolumes( + any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) val factory = new ExecutorPodFactoryImpl( conf, @@ -218,7 +225,20 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { test("Small-files add a secret & secret volume mount to the container") { val conf = baseConf.clone() - val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") + + //val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") + val smallFiles = mock(classOf[MountSmallFilesBootstrap]) + when(smallFiles.mountSmallFilesSecret( + any(classOf[Pod]), + any(classOf[Container]))).thenAnswer(new Answer[(Pod, Container)] { + def answer(invocation: InvocationOnMock): (Pod, Container) = { + val pod = invocation.getArgumentAt(0, classOf[Pod]) + val container = invocation.getArgumentAt(1, classOf[Container]) + val secretName = "secret1" + val secretMountPath = "/var/secret1" + (pod, container) + } + }) val factory = new ExecutorPodFactoryImpl( conf, From 36f5c2558e5e28e1a12742e5d54e8556daab2bb1 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Fri, 29 Sep 2017 05:47:03 -0700 Subject: [PATCH 15/18] insert actual logic into smallFiles mock --- .../cluster/k8s/ExecutorPodFactorySuite.scala | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 09fe08ec2165d..72fe16a9c580f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -236,7 +236,27 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val container = invocation.getArgumentAt(1, classOf[Container]) val secretName = "secret1" val secretMountPath = "/var/secret1" - (pod, container) + val resolvedPod = new PodBuilder(pod) + .editOrNewSpec() + .addNewVolume() + .withName("submitted-files") + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + val resolvedContainer = new ContainerBuilder(container) + .addNewEnv() + .withName(constants.ENV_MOUNTED_FILES_FROM_SECRET_DIR) + .withValue(secretMountPath) + .endEnv() + .addNewVolumeMount() + .withName("submitted-files") + .withMountPath(secretMountPath) + .endVolumeMount() + .build() + (resolvedPod, resolvedContainer) } }) From 8fc10e6e133375621326103c59f7d295012c8b45 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Mon, 2 Oct 2017 15:06:41 -0700 Subject: [PATCH 16/18] verify application of nodeAffinityExecutorPodModifier --- .../cluster/k8s/ExecutorPodFactorySuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 72fe16a9c580f..d86f89f0be1e7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -84,6 +84,9 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + // The executor pod name and default labels. assert(executor.getMetadata.getName === s"$executorPrefix-exec-1") assert(executor.getMetadata.getLabels.size() === 3) @@ -115,6 +118,9 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + assert(executor.getSpec.getHostname.length === 63) } @@ -133,6 +139,9 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "secret1-volume") @@ -175,6 +184,9 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + assert(executor.getMetadata.getAnnotations.size() === 1) assert(executor.getMetadata.getAnnotations.containsKey(constants.INIT_CONTAINER_ANNOTATION)) checkOwnerReferences(executor, driverPodUid) @@ -214,6 +226,8 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) @@ -271,6 +285,8 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) @@ -296,6 +312,9 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + verify(nodeAffinityExecutorPodModifier, times(1)) + .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) + checkEnv(executor, Set("SPARK_JAVA_OPT_0", "SPARK_EXECUTOR_EXTRA_CLASSPATH", "qux")) checkOwnerReferences(executor, driverPodUid) } From 0892830ae5401034fd97758c80f5201020213d30 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Mon, 2 Oct 2017 16:34:10 -0700 Subject: [PATCH 17/18] avoid cumulative invocation --- .../cluster/k8s/ExecutorPodFactorySuite.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index d86f89f0be1e7..6725261b93d68 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -22,12 +22,13 @@ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.api.model.{Pod, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Matchers.{any, eq => mockitoEq} -import org.mockito.Mockito.{doNothing, never, times, verify, when, mock} +import org.mockito.Mockito.{doNothing, never, times, verify, when, mock, reset} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer + import org.scalatest.mock.MockitoSugar._ import org.apache.commons.io.FilenameUtils @@ -39,7 +40,7 @@ import org.apache.spark.deploy.k8s.{constants, SparkPodInitContainerBootstrapImp import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl, MountSmallFilesBootstrap} -class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { +class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { private val driverPodName: String = "driver-pod" private val driverPodUid: String = "driver-uid" private val driverUrl: String = "driver-url" @@ -66,11 +67,15 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter { .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, executorPrefix) .set(EXECUTOR_DOCKER_IMAGE, executorImage) + } + private var kubernetesClient: KubernetesClient = _ + + override def beforeEach(cmap: org.scalatest.ConfigMap) { + reset(nodeAffinityExecutorPodModifier) when(nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( any(classOf[Pod]), any(classOf[Map[String, Int]]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) } - private var kubernetesClient: KubernetesClient = _ test("basic executor pod has reasonable defaults") { val factory = new ExecutorPodFactoryImpl( From e278aca8298d2c5570f3a49e47642266a99be0b2 Mon Sep 17 00:00:00 2001 From: foxish Date: Mon, 9 Oct 2017 20:32:22 -0700 Subject: [PATCH 18/18] Fixed env-var check to include values, removed mock for small files --- .../cluster/k8s/ExecutorPodFactorySuite.scala | 108 ++++++------------ 1 file changed, 32 insertions(+), 76 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 6725261b93d68..6690217557637 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -18,29 +18,22 @@ package org.apache.spark.scheduler.cluster.k8s import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.{Pod, VolumeBuilder, VolumeMountBuilder, _} import io.fabric8.kubernetes.client.KubernetesClient -import io.fabric8.kubernetes.api.model.{Pod, Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} - -import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} -import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} +import org.apache.commons.io.FilenameUtils +import org.mockito.{AdditionalAnswers, MockitoAnnotations} import org.mockito.Matchers.{any, eq => mockitoEq} -import org.mockito.Mockito.{doNothing, never, times, verify, when, mock, reset} +import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} -import org.scalatest.mock.MockitoSugar._ - -import org.apache.commons.io.FilenameUtils - -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} -import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl -import org.apache.spark.deploy.k8s.{constants, SparkPodInitContainerBootstrapImpl, SparkPodInitContainerBootstrap, PodWithDetachedInitContainer} +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{constants, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl, MountSmallFilesBootstrap} +import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrap, MountSmallFilesBootstrapImpl} -class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { +class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { private val driverPodName: String = "driver-pod" private val driverPodUid: String = "driver-uid" private val driverUrl: String = "driver-url" @@ -109,7 +102,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with assert(executor.getSpec.getNodeSelector.isEmpty) assert(executor.getSpec.getVolumes.isEmpty) - checkEnv(executor, Set()) + checkEnv(executor, Map()) checkOwnerReferences(executor, driverPodUid) } @@ -149,7 +142,8 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) - assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName === "secret1-volume") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName + === "secret1-volume") assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) .getMountPath === "/var/secret1") @@ -162,18 +156,6 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with test("init-container bootstrap step adds an init container") { val conf = baseConf.clone() - -/* - val initContainerBootstrap = new SparkPodInitContainerBootstrapImpl( - "init-image", - "IfNotPresent", - "/some/path/", - "some/other/path", - 10, - "config-map-name", - "config-map-key") -*/ - val initContainerBootstrap = mock(classOf[SparkPodInitContainerBootstrap]) when(initContainerBootstrap.bootstrapInitContainerAndVolumes( any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) @@ -245,40 +227,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with test("Small-files add a secret & secret volume mount to the container") { val conf = baseConf.clone() - //val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") - val smallFiles = mock(classOf[MountSmallFilesBootstrap]) - when(smallFiles.mountSmallFilesSecret( - any(classOf[Pod]), - any(classOf[Container]))).thenAnswer(new Answer[(Pod, Container)] { - def answer(invocation: InvocationOnMock): (Pod, Container) = { - val pod = invocation.getArgumentAt(0, classOf[Pod]) - val container = invocation.getArgumentAt(1, classOf[Container]) - val secretName = "secret1" - val secretMountPath = "/var/secret1" - val resolvedPod = new PodBuilder(pod) - .editOrNewSpec() - .addNewVolume() - .withName("submitted-files") - .withNewSecret() - .withSecretName(secretName) - .endSecret() - .endVolume() - .endSpec() - .build() - val resolvedContainer = new ContainerBuilder(container) - .addNewEnv() - .withName(constants.ENV_MOUNTED_FILES_FROM_SECRET_DIR) - .withValue(secretMountPath) - .endEnv() - .addNewVolumeMount() - .withName("submitted-files") - .withMountPath(secretMountPath) - .endVolumeMount() - .build() - (resolvedPod, resolvedContainer) - } - }) - + val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") val factory = new ExecutorPodFactoryImpl( conf, nodeAffinityExecutorPodModifier, @@ -304,7 +253,7 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1") checkOwnerReferences(executor, driverPodUid) - checkEnv(executor, Set("SPARK_MOUNTED_FILES_FROM_SECRET_DIR")) + checkEnv(executor, Map("SPARK_MOUNTED_FILES_FROM_SECRET_DIR" -> "/var/secret1")) } test("classpath and extra java options get translated into environment variables") { @@ -320,7 +269,10 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with verify(nodeAffinityExecutorPodModifier, times(1)) .addNodeAffinityAnnotationIfUseful(any(classOf[Pod]), any(classOf[Map[String, Int]])) - checkEnv(executor, Set("SPARK_JAVA_OPT_0", "SPARK_EXECUTOR_EXTRA_CLASSPATH", "qux")) + checkEnv(executor, + Map("SPARK_JAVA_OPT_0" -> "foo=bar", + "SPARK_EXECUTOR_EXTRA_CLASSPATH" -> "bar=baz", + "qux" -> "quux")) checkOwnerReferences(executor, driverPodUid) } @@ -332,18 +284,22 @@ class ExecutorPodFactoryImplSuite extends SparkFunSuite with BeforeAndAfter with } // Check that the expected environment variables are present. - private def checkEnv(executor: Pod, additionalEnvVars: Set[String]): Unit = { - val defaultEnvs = Set(constants.ENV_EXECUTOR_ID, - constants.ENV_DRIVER_URL, constants.ENV_EXECUTOR_CORES, - constants.ENV_EXECUTOR_MEMORY, constants.ENV_APPLICATION_ID, - constants.ENV_MOUNTED_CLASSPATH, constants.ENV_EXECUTOR_POD_IP, - constants.ENV_EXECUTOR_PORT) ++ additionalEnvVars + private def checkEnv(executor: Pod, additionalEnvVars: Map[String, String]): Unit = { + val defaultEnvs = Map( + constants.ENV_EXECUTOR_ID -> "1", + constants.ENV_DRIVER_URL -> "dummy", + constants.ENV_EXECUTOR_CORES -> "1", + constants.ENV_EXECUTOR_MEMORY -> "1g", + constants.ENV_APPLICATION_ID -> "dummy", + constants.ENV_MOUNTED_CLASSPATH -> "/var/spark-data/spark-jars/*", + constants.ENV_EXECUTOR_POD_IP -> null, + constants.ENV_EXECUTOR_PORT -> "10000") ++ additionalEnvVars assert(executor.getSpec.getContainers.size() === 1) assert(executor.getSpec.getContainers.get(0).getEnv().size() === defaultEnvs.size) - val setEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map { - x => x.getName - }.toSet - assert(defaultEnvs === setEnvs) + val mapEnvs = executor.getSpec.getContainers.get(0).getEnv.asScala.map { + x => (x.getName, x.getValue) + }.toMap + assert(defaultEnvs === mapEnvs) } }