diff --git a/e2e/runner.sh b/e2e/runner.sh index 8acecc2..bdbcb54 100755 --- a/e2e/runner.sh +++ b/e2e/runner.sh @@ -112,6 +112,7 @@ $root/spark/build/mvn clean -Ddownload.plugin.skip=true integration-test \ -Dspark-distro-tgz=$root/spark/*.tgz \ -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://$MASTER \ -Dspark.docker.test.driverImage=$IMAGE_REPO/spark-driver:$tag \ - -Dspark.docker.test.executorImage=$IMAGE_REPO/spark-executor:$tag" || : + -Dspark.docker.test.executorImage=$IMAGE_REPO/spark-executor:$tag \ + -Dspark.docker.test.initContainerImage=$IMAGE_REPO/spark-init:$tag" || : echo "TEST SUITE FINISHED" diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index f2d9adb..af09ce5 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -24,7 +24,7 @@ import java.util.regex.Pattern import scala.collection.JavaConverters._ import com.google.common.io.PatternFilenameFilter -import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.{Container, Pod} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} @@ -52,6 +52,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit before { sparkAppConf = kubernetesTestComponents.newSparkAppConf() .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) + .set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL) kubernetesTestComponents.createNamespace() } @@ -70,10 +71,25 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit test("Run SparkPi with a master URL without a scheme.") { val url = kubernetesTestComponents.kubernetesClient.getMasterUrl - sparkAppConf.set("spark.master", s"k8s://${url.getHost}:${url.getPort}") + val k8sMasterUrl = if (url.getPort < 0) { + s"k8s://${url.getHost}" + } else { + s"k8s://${url.getHost}:${url.getPort}" + } + sparkAppConf.set("spark.master", k8sMasterUrl) runSparkPiAndVerifyCompletion() } + test("Run SparkPi with an argument.") { + runSparkPiAndVerifyCompletion(appArgs = Array("5")) + } + + test("Run SparkPi using the remote example jar.") { + sparkAppConf.set("spark.kubernetes.initContainer.image", + System.getProperty("spark.docker.test.initContainerImage", "spark-init:latest")) + runSparkPiAndVerifyCompletion(appResource = REMOTE_EXAMPLES_JAR_URI) + } + test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") { sparkAppConf .set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi") @@ -83,37 +99,75 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") - runSparkPiAndVerifyCompletion(driverPodChecker = (driverPod: Pod) => { - doBasicDriverPodCheck(driverPod) - assert(driverPod.getMetadata.getName === "spark-integration-spark-pi") - - assert(driverPod.getMetadata.getLabels.get("label1") === "label1-value") - assert(driverPod.getMetadata.getLabels.get("label2") === "label2-value") - assert(driverPod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") - assert(driverPod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") - - val driverContainer = driverPod.getSpec.getContainers.get(0) - val envVars = driverContainer - .getEnv - .asScala - .map { env => - (env.getName, env.getValue) - } - .toMap - assert(envVars("ENV1") === "VALUE1") - assert(envVars("ENV2") === "VALUE2") - }) + .set("spark.kubernetes.executor.label.label1", "label1-value") + .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") + .set("spark.executorEnv.ENV1", "VALUE1") + .set("spark.executorEnv.ENV2", "VALUE2") + + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + assert(driverPod.getMetadata.getName === "spark-integration-spark-pi") + checkCustomSettings(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkCustomSettings(executorPod) + }) + } + + test("Run SparkPi with a test secret mounted into the driver and executor pods") { + createTestSecret() + sparkAppConf + .set(s"spark.kubernetes.driver.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH) + .set(s"spark.kubernetes.executor.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH) + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkTestSecret(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkTestSecret(executorPod) + }) + } + + test("Run SparkPi using the remote example jar with a test secret mounted into the driver and " + + "executor pods") { + sparkAppConf + .set(s"spark.kubernetes.driver.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH) + .set(s"spark.kubernetes.executor.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH) + sparkAppConf.set("spark.kubernetes.initContainer.image", + System.getProperty("spark.docker.test.initContainerImage", "spark-init:latest")) + + createTestSecret() + + runSparkPiAndVerifyCompletion( + appResource = REMOTE_EXAMPLES_JAR_URI, + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkTestSecret(driverPod, withInitContainer = true) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkTestSecret(executorPod, withInitContainer = true) + }) } private def runSparkPiAndVerifyCompletion( appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR, - driverPodChecker: Pod => Unit = doBasicDriverPodCheck): Unit = { + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + appArgs: Array[String] = Array.empty[String]): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, Seq("Pi is roughly 3"), - Array.empty[String], - driverPodChecker) + appArgs, + driverPodChecker, + executorPodChecker) } private def runSparkApplicationAndVerifyCompletion( @@ -121,18 +175,33 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit mainClass: String, expectedLogOnCompletion: Seq[String], appArgs: Array[String], - driverPodChecker: Pod => Unit): Unit = { + driverPodChecker: Pod => Unit, + executorPodChecker: Pod => Unit): Unit = { val appArguments = SparkAppArguments( mainAppResource = appResource, - mainClass = mainClass) + mainClass = mainClass, + appArgs = appArgs) SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt) + val driverPod = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .withLabel("spark-role", "driver") .list() .getItems .get(0) driverPodChecker(driverPod) + + val executorPods = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .withLabel("spark-role", "executor") + .list() + .getItems + executorPods.asScala.foreach { pod => + executorPodChecker(pod) + } + Eventually.eventually(TIMEOUT, INTERVAL) { expectedLogOnCompletion.foreach { e => assert(kubernetesTestComponents.kubernetesClient @@ -145,7 +214,64 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } private def doBasicDriverPodCheck(driverPod: Pod): Unit = { - assert(driverPod.getMetadata.getLabels.get("spark-role") === "driver") + assert(driverPod.getSpec.getContainers.get(0).getImage === "spark-driver:latest") + assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + } + + private def doBasicExecutorPodCheck(executorPod: Pod): Unit = { + assert(executorPod.getSpec.getContainers.get(0).getImage === "spark-executor:latest") + assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + } + + private def checkCustomSettings(pod: Pod): Unit = { + assert(pod.getMetadata.getLabels.get("label1") === "label1-value") + assert(pod.getMetadata.getLabels.get("label2") === "label2-value") + assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") + assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") + + val container = pod.getSpec.getContainers.get(0) + val envVars = container + .getEnv + .asScala + .map { env => + (env.getName, env.getValue) + } + .toMap + assert(envVars("ENV1") === "VALUE1") + assert(envVars("ENV2") === "VALUE2") + } + + private def createTestSecret(): Unit = { + testBackend.getKubernetesClient.secrets + .createNew() + .editOrNewMetadata() + .withName(TEST_SECRET_NAME) + .withNamespace(kubernetesTestComponents.namespace) + .endMetadata() + .addToStringData(TEST_SECRET_KEY, TEST_SECRET_VALUE) + .done() + } + + private def checkTestSecret(pod: Pod, withInitContainer: Boolean = false): Unit = { + val testSecretVolume = pod.getSpec.getVolumes.asScala.filter { volume => + volume.getName == s"$TEST_SECRET_NAME-volume" + } + assert(testSecretVolume.size === 1) + assert(testSecretVolume.head.getSecret.getSecretName === TEST_SECRET_NAME) + + checkTestSecretInContainer(pod.getSpec.getContainers.get(0)) + + if (withInitContainer) { + checkTestSecretInContainer(pod.getSpec.getInitContainers.get(0)) + } + } + + private def checkTestSecretInContainer(container: Container): Unit = { + val testSecret = container.getVolumeMounts.asScala.filter { mount => + mount.getName == s"$TEST_SECRET_NAME-volume" + } + assert(testSecret.size === 1) + assert(testSecret.head.getMountPath === TEST_SECRET_MOUNT_PATH) } } @@ -161,5 +287,13 @@ private[spark] object KubernetesSuite { s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}" val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" + val TEST_SECRET_NAME = "test-secret" + val TEST_SECRET_KEY = "test-key" + val TEST_SECRET_VALUE = "test-data" + val TEST_SECRET_MOUNT_PATH = "/etc/secrets" + + val REMOTE_EXAMPLES_JAR_URI = + "https://storage.googleapis.com/spark-k8s-integration-tests/jars/spark-examples_2.11-2.3.0.jar" + case object ShuffleNotReadyException extends Exception } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index f1852ba..86adf54 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -47,7 +47,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl val namespaceList = defaultClient .namespaces() .list() - .getItems() + .getItems .asScala require(!namespaceList.exists(_.getMetadata.getName == namespace)) } @@ -91,7 +91,8 @@ private[spark] class SparkAppConf { private[spark] case class SparkAppArguments( mainAppResource: String, - mainClass: String) + mainClass: String, + appArgs: Array[String]) private[spark] object SparkAppLauncher extends Logging { @@ -104,7 +105,9 @@ private[spark] object SparkAppLauncher extends Logging { "--deploy-mode", "cluster", "--class", appArguments.mainClass, "--master", appConf.get("spark.master") - ) ++ appConf.toStringArray :+ appArguments.mainAppResource + ) ++ appConf.toStringArray :+ + appArguments.mainAppResource :+ + appArguments.appArgs.mkString(" ") logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") ProcessUtils.executeProcess(commandLine, timeoutSecs) } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index 51f8b96..e1aaf13 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -25,7 +25,7 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTest private[spark] trait IntegrationTestBackend { def name(): String def initialize(): Unit - def getKubernetesClient(): DefaultKubernetesClient + def getKubernetesClient: DefaultKubernetesClient def cleanUp(): Unit = {} } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index b3a359f..3d4112b 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH import org.apache.spark.deploy.k8s.integrationtest.Logging private[spark] class SparkDockerImageBuilder - (private val dockerEnv: Map[String, String]) extends Logging{ + (private val dockerEnv: Map[String, String]) extends Logging { private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH // Dockerfile paths must be relative to the build path. @@ -36,6 +36,7 @@ private[spark] class SparkDockerImageBuilder private val BASE_DOCKER_FILE = DOCKERFILES_DIR + "spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = DOCKERFILES_DIR + "driver/Dockerfile" private val EXECUTOR_DOCKER_FILE = DOCKERFILES_DIR + "executor/Dockerfile" + private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", @@ -64,6 +65,7 @@ private[spark] class SparkDockerImageBuilder buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) } private def buildImage(name: String, dockerFile: String): Unit = {