diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 61959ca2a3041..aa5d847f4be2f 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -197,6 +197,9 @@ do if ! which minikube 1>/dev/null; then error "Cannot find minikube." fi + if ! minikube status 1>/dev/null; then + error "Cannot contact minikube. Make sure it's running." + fi eval $(minikube docker-env) ;; esac diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a0aaef293e96f..ca57df0e31a7f 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -374,6 +374,8 @@ object SparkBuild extends PomBuild { // SPARK-14738 - Remove docker tests from main Spark build // enable(DockerIntegrationTests.settings)(dockerIntegrationTests) + enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) + /** * Adds the ability to run the spark shell directly from SBT without building an assembly * jar. @@ -458,6 +460,65 @@ object DockerIntegrationTests { ) } +/** + * These settings run a hardcoded configuration of the Kubernetes integration tests using + * minikube. Docker images will have the "dev" tag, and will be overwritten every time the + * integration tests are run. The integration tests are actually bound to the "test" phase, + * so running "test" on this module will run the integration tests. + * + * There are two ways to run the tests: + * - the "tests" task builds docker images and runs the test, so it's a little slow. + * - the "run-its" task just runs the tests on a pre-built set of images. + * + * Note that this does not use the shell scripts that the maven build uses, which are more + * configurable. This is meant as a quick way for developers to run these tests against their + * local changes. + */ +object KubernetesIntegrationTests { + import BuildCommons._ + + val dockerBuild = TaskKey[Unit]("docker-imgs", "Build the docker images for ITs.") + val runITs = TaskKey[Unit]("run-its", "Only run ITs, skip image build.") + val imageTag = settingKey[String]("Tag to use for images built during the test.") + val namespace = settingKey[String]("Namespace where to run pods.") + + // Hack: this variable is used to control whether to build docker images. It's updated by + // the tasks below in a non-obvious way, so that you get the functionality described in + // the scaladoc above. + private var shouldBuildImage = true + + lazy val settings = Seq( + imageTag := "dev", + namespace := "default", + dockerBuild := { + if (shouldBuildImage) { + val dockerTool = s"$sparkHome/bin/docker-image-tool.sh" + val cmd = Seq(dockerTool, "-m", "-t", imageTag.value, "build") + val ec = Process(cmd).! + if (ec != 0) { + throw new IllegalStateException(s"Process '${cmd.mkString(" ")}' exited with $ec.") + } + } + shouldBuildImage = true + }, + runITs := Def.taskDyn { + shouldBuildImage = false + Def.task { + (test in Test).value + } + }.value, + test in Test := (test in Test).dependsOn(dockerBuild).value, + javaOptions in Test ++= Seq( + "-Dspark.kubernetes.test.deployMode=minikube", + s"-Dspark.kubernetes.test.imageTag=${imageTag.value}", + s"-Dspark.kubernetes.test.namespace=${namespace.value}", + s"-Dspark.kubernetes.test.unpackSparkDir=$sparkHome" + ), + // Force packaging before building images, so that the latest code is tested. + dockerBuild := dockerBuild.dependsOn(packageBin in Compile in assembly).value + ) +} + /** * Overrides to work around sbt's dependency resolution being different from Maven's. */ diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 07288c97bd527..301b6fe8eee56 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -155,14 +155,10 @@ test + none test - - - (?<!Suite) - integration-test diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 6aa1d57085068..b746a01eb5294 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -19,9 +19,11 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.File import java.nio.file.{Path, Paths} import java.util.UUID -import java.util.regex.Pattern -import com.google.common.io.PatternFilenameFilter +import scala.collection.JavaConverters._ + +import com.google.common.base.Charsets +import com.google.common.io.Files import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action @@ -29,24 +31,22 @@ import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, Tag} import org.scalatest.Matchers import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} -import scala.collection.JavaConverters._ -import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.k8s.integrationtest.TestConfig._ +import org.apache.spark.{SPARK_VERSION, SparkFunSuite} import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} import org.apache.spark.internal.Logging -private[spark] class KubernetesSuite extends SparkFunSuite +class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with Logging with Eventually with Matchers { import KubernetesSuite._ - private var sparkHomeDir: Path = _ - private var pyImage: String = _ - private var rImage: String = _ + protected var sparkHomeDir: Path = _ + protected var pyImage: String = _ + protected var rImage: String = _ protected var image: String = _ protected var testBackend: IntegrationTestBackend = _ @@ -67,6 +67,30 @@ private[spark] class KubernetesSuite extends SparkFunSuite private val extraExecTotalMemory = s"${(1024 + memOverheadConstant*1024 + additionalMemory).toInt}Mi" + /** + * Build the image ref for the given image name, taking the repo and tag from the + * test configuration. + */ + private def testImageRef(name: String): String = { + val tag = sys.props.get(CONFIG_KEY_IMAGE_TAG_FILE) + .map { path => + val tagFile = new File(path) + require(tagFile.isFile, + s"No file found for image tag at ${tagFile.getAbsolutePath}.") + Files.toString(tagFile, Charsets.UTF_8).trim + } + .orElse(sys.props.get(CONFIG_KEY_IMAGE_TAG)) + .getOrElse { + throw new IllegalArgumentException( + s"One of $CONFIG_KEY_IMAGE_TAG_FILE or $CONFIG_KEY_IMAGE_TAG is required.") + } + val repo = sys.props.get(CONFIG_KEY_IMAGE_REPO) + .map { _ + "/" } + .getOrElse("") + + s"$repo$name:$tag" + } + override def beforeAll(): Unit = { super.beforeAll() // The scalatest-maven-plugin gives system properties that are referenced but not set null @@ -83,17 +107,16 @@ private[spark] class KubernetesSuite extends SparkFunSuite sparkHomeDir = Paths.get(sparkDirProp) require(sparkHomeDir.toFile.isDirectory, s"No directory found for spark home specified at $sparkHomeDir.") - val imageTag = getTestImageTag - val imageRepo = getTestImageRepo - image = s"$imageRepo/spark:$imageTag" - pyImage = s"$imageRepo/spark-py:$imageTag" - rImage = s"$imageRepo/spark-r:$imageTag" - - val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) - .toFile - .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) - containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" + - s"${sparkDistroExamplesJarFile.getName}" + image = testImageRef("spark") + pyImage = testImageRef("spark-py") + rImage = testImageRef("spark-r") + + val scalaVersion = scala.util.Properties.versionNumberString + .split("\\.") + .take(2) + .mkString(".") + containerLocalSparkDistroExamplesJar = + s"local:///opt/spark/examples/jars/spark-examples_$scalaVersion-${SPARK_VERSION}.jar" testBackend = IntegrationTestBackendFactory.getTestBackend testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index 004a942c1cdb3..9ead70f670891 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -28,7 +28,10 @@ object ProcessUtils extends Logging { * executeProcess is used to run a command and return the output if it * completes within timeout seconds. */ - def executeProcess(fullCommand: Array[String], timeout: Long, dumpErrors: Boolean = false): Seq[String] = { + def executeProcess( + fullCommand: Array[String], + timeout: Long, + dumpErrors: Boolean = false): Seq[String] = { val pb = new ProcessBuilder().command(fullCommand: _*) pb.redirectErrorStream(true) val proc = pb.start() @@ -41,7 +44,8 @@ object ProcessUtils extends Logging { assert(proc.waitFor(timeout, TimeUnit.SECONDS), s"Timed out while executing ${fullCommand.mkString(" ")}") assert(proc.exitValue == 0, - s"Failed to execute ${fullCommand.mkString(" ")}${if (dumpErrors) "\n" + outputLines.mkString("\n")}") + s"Failed to execute ${fullCommand.mkString(" ")}" + + s"${if (dumpErrors) "\n" + outputLines.mkString("\n")}") outputLines } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala index 06b73107ec236..904279923334f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PythonTestsSuite.scala @@ -16,18 +16,14 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag} - private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => import PythonTestsSuite._ import KubernetesSuite.k8sTestTag - private val pySparkDockerImage = - s"${getTestImageRepo}/spark-py:${getTestImageTag}" test("Run PySpark on simple pi.py example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_PI, mainClass = "", @@ -41,7 +37,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with Python2 to test a pyfiles example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) .set("spark.kubernetes.pyspark.pythonVersion", "2") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, @@ -59,7 +55,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with Python3 to test a pyfiles example", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) .set("spark.kubernetes.pyspark.pythonVersion", "3") runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_FILES, @@ -77,7 +73,7 @@ private[spark] trait PythonTestsSuite { k8sSuite: KubernetesSuite => test("Run PySpark with memory customization", k8sTestTag) { sparkAppConf - .set("spark.kubernetes.container.image", pySparkDockerImage) + .set("spark.kubernetes.container.image", pyImage) .set("spark.kubernetes.pyspark.pythonVersion", "3") .set("spark.kubernetes.memoryOverheadFactor", s"$memOverheadConstant") .set("spark.executor.pyspark.memory", s"${additionalMemory}m") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala index 885a23cfb4864..e81562a923228 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/RTestsSuite.scala @@ -16,16 +16,13 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import org.apache.spark.deploy.k8s.integrationtest.TestConfig.{getTestImageRepo, getTestImageTag} - private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite => import RTestsSuite._ import KubernetesSuite.k8sTestTag test("Run SparkR on simple dataframe.R example", k8sTestTag) { - sparkAppConf - .set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-r:${getTestImageTag}") + sparkAppConf.set("spark.kubernetes.container.image", rImage) runSparkApplicationAndVerifyCompletion( appResource = SPARK_R_DATAFRAME_TEST, mainClass = "", diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala deleted file mode 100644 index 363ec0a6016bb..0000000000000 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConfig.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.integrationtest - -import java.io.File - -import com.google.common.base.Charsets -import com.google.common.io.Files - -import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ - -object TestConfig { - def getTestImageTag: String = { - val imageTagFileProp = System.getProperty(CONFIG_KEY_IMAGE_TAG_FILE) - require(imageTagFileProp != null, "Image tag file must be provided in system properties.") - val imageTagFile = new File(imageTagFileProp) - require(imageTagFile.isFile, s"No file found for image tag at ${imageTagFile.getAbsolutePath}.") - Files.toString(imageTagFile, Charsets.UTF_8).trim - } - - def getTestImageRepo: String = { - val imageRepo = System.getProperty(CONFIG_KEY_IMAGE_REPO) - require(imageRepo != null, "Image repo must be provided in system properties.") - imageRepo - } -} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala index eeae70cd68571..ecc4df716330d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestConstants.scala @@ -26,7 +26,7 @@ object TestConstants { val CONFIG_KEY_KUBE_MASTER_URL = "spark.kubernetes.test.master" val CONFIG_KEY_KUBE_NAMESPACE = "spark.kubernetes.test.namespace" val CONFIG_KEY_KUBE_SVC_ACCOUNT = "spark.kubernetes.test.serviceAccountName" - val CONFIG_KEY_IMAGE_TAG = "spark.kubernetes.test.imageTagF" + val CONFIG_KEY_IMAGE_TAG = "spark.kubernetes.test.imageTag" val CONFIG_KEY_IMAGE_TAG_FILE = "spark.kubernetes.test.imageTagFile" val CONFIG_KEY_IMAGE_REPO = "spark.kubernetes.test.imageRepo" val CONFIG_KEY_UNPACK_DIR = "spark.kubernetes.test.unpackSparkDir" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index 7bf324c6c4a14..56ddae0c9c57c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest.backend import io.fabric8.kubernetes.client.DefaultKubernetesClient + import org.apache.spark.deploy.k8s.integrationtest.TestConstants._ import org.apache.spark.deploy.k8s.integrationtest.backend.cloud.KubeConfigBackend import org.apache.spark.deploy.k8s.integrationtest.backend.docker.DockerForDesktopBackend @@ -35,7 +36,8 @@ private[spark] object IntegrationTestBackendFactory { .getOrElse(BACKEND_MINIKUBE) deployMode match { case BACKEND_MINIKUBE => MinikubeTestBackend - case BACKEND_CLOUD => new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT)) + case BACKEND_CLOUD => + new KubeConfigBackend(System.getProperty(CONFIG_KEY_KUBE_CONFIG_CONTEXT)) case BACKEND_DOCKER_FOR_DESKTOP => DockerForDesktopBackend case _ => throw new IllegalArgumentException("Invalid " + CONFIG_KEY_DEPLOY_MODE + ": " + deployMode) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala index 333526ba3ef98..be1834c0b5dea 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala @@ -18,9 +18,10 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.cloud import java.nio.file.Paths -import io.fabric8.kubernetes.client.utils.Utils import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.utils.Utils import org.apache.commons.lang3.StringUtils + import org.apache.spark.deploy.k8s.integrationtest.TestConstants import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend import org.apache.spark.internal.Logging @@ -38,7 +39,7 @@ private[spark] class KubeConfigBackend(var context: String) // Auto-configure K8S client from K8S config file if (Utils.getSystemPropertyOrEnvVar(Config.KUBERNETES_KUBECONFIG_FILE, null: String) == null) { // Fabric 8 client will automatically assume a default location in this case - logWarning(s"No explicit KUBECONFIG specified, will assume .kube/config under your home directory") + logWarning("No explicit KUBECONFIG specified, will assume $HOME/.kube/config") } val config = Config.autoConfigure(context)