From 0d8c7f6d883818cbdd2b830d1829b505e8263e2a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 30 Oct 2018 11:18:16 -0700 Subject: [PATCH 1/3] [SPARK-25897][k8s] Hook up k8s integration tests to sbt build. The integration tests can now be run in sbt if the right profile is enabled, using the "test" task under the respective project. This avoids having to fall back to maven to run the tests, which invalidates all your compiled stuff when you go back to sbt, making development way slower than it should. There's also a task to run the tests directly without refreshing the docker images, which is helpful if you just made a change to the submission code which should not affect the code in the images. The sbt tasks currently are not very customizable; there's some very minor things you can set in the sbt shell itself, but otherwise it's hardcoded to run on minikube. I also had to make some slight adjustments to the IT code itself, mostly to remove assumptions about the existing harness. Tested on sbt and maven. --- project/SparkBuild.scala | 58 +++++++++++++++++ .../kubernetes/integration-tests/pom.xml | 6 +- .../k8s/integrationtest/KubernetesSuite.scala | 64 +++++++++++++------ .../integrationtest/PythonTestsSuite.scala | 12 ++-- .../k8s/integrationtest/RTestsSuite.scala | 5 +- .../k8s/integrationtest/TestConfig.scala | 38 ----------- 6 files changed, 108 insertions(+), 75 deletions(-) delete mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a0aaef293e96f..00831fd77f57d 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -374,6 +374,8 @@ object SparkBuild extends PomBuild { // SPARK-14738 - Remove docker tests from main Spark build // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) + enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) + /** * Adds the ability to run the spark shell directly from SBT without building an assembly * jar. @@ -458,6 +460,62 @@ object DockerIntegrationTests { ) } +/** + * These settings run a hardcoded configuration of the Kubernetes integration tests using + * minikube. Docker images will have the "dev" tag, and will be overwritten every time the + * integration tests are run. The integration tests are actually bound to the "test" phase, + * so running "test" on this module will run the integration tests. + * + * There are two ways to run the tests: + * - the "tests" task builds docker images and runs the test, so it's a little slow. + * - the "run-its" task just runs the tests on a pre-built set of images. + * + * Note that this does not use the shell scripts that the maven build uses, which are more + * configurable. This is meant as a quick way for developers to run these tests against their + * local changes. + */ +object KubernetesIntegrationTests { + import BuildCommons._ + + val dockerBuild = TaskKey[Unit]("docker-imgs", "Build the docker images for ITs.") + val runITs = TaskKey[Unit]("run-its", "Only run ITs, skip image build.") + val imageTag = settingKey[String]("Tag to use for images built during the test.") + val namespace = settingKey[String]("Namespace where to run pods.") + + // Hack: this variable is used to control whether to build docker images. It's updated by + // the tasks below in a non-obvious way, so that you get the functionality described in + // the scaladoc above. + private var shouldBuildImage = true + + lazy val settings = Seq( + imageTag := "dev", + namespace := "default", + dockerBuild := { + if (shouldBuildImage) { + val dockerTool = s"$sparkHome/bin/docker-image-tool.sh" + val cmd = Seq(dockerTool, "-m", "-t", imageTag.value, "build") + Process(cmd).! + } + shouldBuildImage = true + }, + runITs := Def.taskDyn { + shouldBuildImage = false + Def.task { + (test in Test).value + } + }.value, + test in Test := (test in Test).dependsOn(dockerBuild).value, + javaOptions in Test ++= Seq( + "-Dspark.kubernetes.test.deployMode=minikube", + s"-Dspark.kubernetes.test.imageTag=${imageTag.value}", + s"-Dspark.kubernetes.test.namespace=${namespace.value}", + s"-Dspark.kubernetes.test.unpackSparkDir=$sparkHome" + ), + // Force packaging before building images, so that the latest code is tested. + dockerBuild := dockerBuild.dependsOn(packageBin in Compile in assembly).value + ) +} + /** * Overrides to work around sbt's dependency resolution being different from Maven's. */ diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index a07fe1feea3eb..6d133d6b98f4a 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -145,14 +145,10 @@ test + none test - - - (?<!Suite) - integration-test diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index e2e5880255e2c..706fcce8888f9 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -19,9 +19,11 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.File import java.nio.file.{Path, Paths} import java.util.UUID -import java.util.regex.Pattern -import com.google.common.io.PatternFilenameFilter +import scala.collection.JavaConverters._ + +import com.google.common.base.Charsets +import com.google.common.io.Files import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action @@ -29,23 +31,21 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} import org.scalatest.Matchers import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} -import scala.collection.JavaConverters._ -import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ +import org.apache.spark.{SPARK_VERSION, SparkFunSuite} import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.internal.Logging -private[spark] class KubernetesSuite extends SparkFunSuite +class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with Logging with Eventually with Matchers { import KubernetesSuite._ - private var sparkHomeDir: Path = _ - private var pyImage: String = _ - private var rImage: String = _ + protected var sparkHomeDir: Path = _ + protected var pyImage: String = _ + protected var rImage: String = _ protected var image: String = _ protected var testBackend: IntegrationTestBackend = _ @@ -66,6 +66,31 @@ private[spark] class KubernetesSuite extends SparkFunSuite private val extraExecTotalMemory = s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi" + /** + * Build the image ref for the given image name, taking the repo and tag from the + * test configuration. + */ + private def testImageRef(name: String): String = { + val tag = sys.props.get("spark.kubernetes.test.imageTagFile") + .map { path => + val tagFile = new File(path) + require(tagFile.isFile, + s"No file found for image tag at ${tagFile.getAbsolutePath}.") + Files.toString(tagFile, Charsets.UTF_8).trim + } + .orElse(sys.props.get("spark.kubernetes.test.imageTag")) + .getOrElse { + throw new IllegalArgumentException( + "One of spark.kubernetes.test.imageTagFile or " + + "spark.kubernetes.test.imageTag is required.") + } + val repo = sys.props.get("spark.kubernetes.test.imageRepo") + .map { _ + "/" } + .getOrElse("") + + s"$repo$name:$tag" + } + override def beforeAll(): Unit = { super.beforeAll() // The scalatest-maven-plugin gives system properties that are referenced but not set null @@ -82,17 +107,16 @@ private[spark] class KubernetesSuite extends SparkFunSuite sparkHomeDir = Paths.get(sparkDirProp) require(sparkHomeDir.toFile.isDirectory, s"No directory found for spark home specified at $sparkHomeDir.") - val imageTag = getTestImageTag - val imageRepo = getTestImageRepo - image = s"$imageRepo/spark:$imageTag" - pyImage = s"$imageRepo/spark-py:$imageTag" - rImage = s"$imageRepo/spark-r:$imageTag" - - val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) - .toFile - .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) - containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" + - s"${sparkDistroExamplesJarFile.getName}" + image = testImageRef("spark") + pyImage = testImageRef("spark-py") + rImage = testImageRef("spark-r") + + val scalaVersion = scala.util.Properties.versionNumberString + .split("\\.") + .take(2) + .mkString(".") + containerLocalSparkDistroExamplesJar = + s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar" testBackend = IntegrationTestBackendFactory.getTestBackend testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 06b73107ec236..904279923334f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -16,18 +16,14 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag} - private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => import PythonTestsSuite._ import KubernetesSuite.k8sTestTag - private val pySparkDockerImage = - s"${getTestImageRepo}/spark-py:${getTestImageTag}" test("Run PySpark on simple pi.py example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_PI, mainClass = "", @@ -41,7 +37,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) .set("spark.kubernetes.pyspark.pythonVersion", "2") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, @@ -59,7 +55,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) .set("spark.kubernetes.pyspark.pythonVersion", "3") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, @@ -77,7 +73,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with memory customization", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") .set("spark.executor.pyspark.memory", s"${additionalMemory}m") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala index 885a23cfb4864..e81562a923228 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala @@ -16,16 +16,13 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag} - private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite => import RTestsSuite._ import KubernetesSuite.k8sTestTag test("Run SparkR on simple dataframe.R example", k8sTestTag) { - sparkAppConf - .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-r:${getTestImageTag}") + sparkAppConf.set("spark.kubernetes.container.image", rImage) runSparkApplicationAndVerifyCompletion( appResource = SPARK_R_DATAFRAME_TEST, mainClass = "", diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala deleted file mode 100644 index 5a49e0779160c..0000000000000 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.integrationtest - -import java.io.File - -import com.google.common.base.Charsets -import com.google.common.io.Files - -object TestConfig { - def getTestImageTag: String = { - val imageTagFileProp = System.getProperty("spark.kubernetes.test.imageTagFile") - require(imageTagFileProp != null, "Image tag file must be provided in system properties.") - val imageTagFile = new File(imageTagFileProp) - require(imageTagFile.isFile, s"No file found for image tag at ${imageTagFile.getAbsolutePath}.") - Files.toString(imageTagFile, Charsets.UTF_8).trim - } - - def getTestImageRepo: String = { - val imageRepo = System.getProperty("spark.kubernetes.test.imageRepo") - require(imageRepo != null, "Image repo must be provided in system properties.") - imageRepo - } -} From 534b41bda830fc03cbdabcf76a0204499f742d88 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 1 Nov 2018 10:20:47 -0700 Subject: [PATCH 2/3] Fix style in new code from master. --- .../spark/deploy/k8s/integrationtest/ProcessUtils.scala | 8 ++++++-- .../integrationtest/backend/IntegrationTestBackend.scala | 4 +++- .../integrationtest/backend/cloud/KubeConfigBackend.scala | 5 +++-- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index 004a942c1cdb3..9ead70f670891 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -28,7 +28,10 @@ object ProcessUtils extends Logging { * executeProcess is used to run a command and return the output if it * completes within timeout seconds. */ - def executeProcess(fullCommand: Array[String], timeout: Long, dumpErrors: Boolean = false): Seq[String] = { + def executeProcess( + fullCommand: Array[String], + timeout: Long, + dumpErrors: Boolean = false): Seq[String] = { val pb = new ProcessBuilder().command(fullCommand: _*) pb.redirectErrorStream(true) val proc = pb.start() @@ -41,7 +44,8 @@ object ProcessUtils extends Logging { assert(proc.waitFor(timeout, TimeUnit.SECONDS), s"Timed out while executing ${fullCommand.mkString(" ")}") assert(proc.exitValue == 0, - s"Failed to execute ${fullCommand.mkString(" ")}${if (dumpErrors) "\n" + outputLines.mkString("\n")}") + s"Failed to execute ${fullCommand.mkString(" ")}" + + s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}") outputLines } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index 7bf324c6c4a14..56ddae0c9c57c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest.backend import io.fabric8.kubernetes.client.DefaultKubernetesClient + import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.cloud.KubeConfigBackend import org.apache.spark.deploy.k8s.integrationtest.backend.docker.DockerForDesktopBackend @@ -35,7 +36,8 @@ private[spark] object IntegrationTestBackendFactory { .getOrElse(BACKEND_MINIKUBE) deployMode match { case BACKEND_MINIKUBE => MinikubeTestBackend - case BACKEND_CLOUD => new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT)) + case BACKEND_CLOUD => + new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT)) case BACKEND_DOCKER_FOR_DESKTOP => DockerForDesktopBackend case _ => throw new IllegalArgumentException("Invalid " + CONFIG_KEY_DEPLOY_MODE + ": " + deployMode) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala index 333526ba3ef98..be1834c0b5dea 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala @@ -18,9 +18,10 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.cloud import java.nio.file.Paths -import io.fabric8.kubernetes.client.utils.Utils import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.utils.Utils import org.apache.commons.lang3.StringUtils + import org.apache.spark.deploy.k8s.integrationtest.TestConstants import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend import org.apache.spark.internal.Logging @@ -38,7 +39,7 @@ private[spark] class KubeConfigBackend(var context: String) // Auto-configure K8S client from K8S config file if (Utils.getSystemPropertyOrEnvVar(Config.KUBERNETES_KUBECONFIG_FILE, null: String) == null) { // Fabric 8 client will automatically assume a default location in this case - logWarning(s"No explicit KUBECONFIG specified, will assume .kube/config under your home directory") + logWarning("No explicit KUBECONFIG specified, will assume $HOME/.kube/config") } val config = Config.autoConfigure(context) From f07f50c4e495eb25f92a930e424a579da68c5be6 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 1 Nov 2018 10:31:55 -0700 Subject: [PATCH 3/3] A little better error handling. --- bin/docker-image-tool.sh | 3 +++ project/SparkBuild.scala | 5 ++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 61959ca2a3041..aa5d847f4be2f 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -197,6 +197,9 @@ do if ! which minikube 1>/dev/null; then error "Cannot find minikube." fi + if ! minikube status 1>/dev/null; then + error "Cannot contact minikube. Make sure it's running." + fi eval $(minikube docker-env) ;; esac diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 00831fd77f57d..ca57df0e31a7f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -494,7 +494,10 @@ object KubernetesIntegrationTests { if (shouldBuildImage) { val dockerTool = s"$sparkHome/bin/docker-image-tool.sh" val cmd = Seq(dockerTool, "-m", "-t", imageTag.value, "build") - Process(cmd).! + val ec = Process(cmd).! + if (ec != 0) { + throw new IllegalStateException(s"Process '${cmd.mkString(" ")}' exited with $ec.") + } } shouldBuildImage = true },