diff --git a/README.md b/README.md
index 1e521a7e7b178..531d330234062 100644
--- a/README.md
+++ b/README.md
@@ -81,6 +81,8 @@ can be run using:
Please see the guidance on how to
[run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests).
+There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md
+
## A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
diff --git a/dev/tox.ini b/dev/tox.ini
index 583c1eaaa966b..28dad8f3b5c7c 100644
--- a/dev/tox.ini
+++ b/dev/tox.ini
@@ -16,4 +16,4 @@
[pycodestyle]
ignore=E402,E731,E241,W503,E226,E722,E741,E305
max-line-length=100
-exclude=cloudpickle.py,heapq3.py,shared.py,python/docs/conf.py,work/*/*.py,python/.eggs/*
+exclude=cloudpickle.py,heapq3.py,shared.py,python/docs/conf.py,work/*/*.py,python/.eggs/*,dist/*
diff --git a/pom.xml b/pom.xml
index 883c096ae1ae9..23bbd3b09734e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2705,6 +2705,7 @@
kubernetes
resource-managers/kubernetes/core
+ resource-managers/kubernetes/integration-tests
diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md
new file mode 100644
index 0000000000000..b3863e6b7d1af
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/README.md
@@ -0,0 +1,52 @@
+---
+layout: global
+title: Spark on Kubernetes Integration Tests
+---
+
+# Running the Kubernetes Integration Tests
+
+Note that the integration test framework is currently being heavily revised and
+is subject to change. Note that currently the integration tests only run with Java 8.
+
+The simplest way to run the integration tests is to install and run Minikube, then run the following:
+
+ dev/dev-run-integration-tests.sh
+
+The minimum tested version of Minikube is 0.23.0. The kube-dns addon must be enabled. Minikube should
+run with a minimum of 3 CPUs and 4G of memory:
+
+ minikube start --cpus 3 --memory 4096
+
+You can download Minikube [here](https://github.com/kubernetes/minikube/releases).
+
+# Integration test customization
+
+Configuration of the integration test runtime is done through passing different arguments to the test script. The main useful options are outlined below.
+
+## Re-using Docker Images
+
+By default, the test framework will build new Docker images on every test execution. A unique image tag is generated,
+and it is written to file at `target/imageTag.txt`. To reuse the images built in a previous run, or to use a Docker image tag
+that you have built by other means already, pass the tag to the test script:
+
+ dev/dev-run-integration-tests.sh --image-tag
+
+where if you still want to use images that were built before by the test framework:
+
+ dev/dev-run-integration-tests.sh --image-tag $(cat target/imageTag.txt)
+
+## Spark Distribution Under Test
+
+The Spark code to test is handed to the integration test system via a tarball. Here is the option that is used to specify the tarball:
+
+* `--spark-tgz ` - set `` to point to a tarball containing the Spark distribution to test.
+
+TODO: Don't require the packaging of the built Spark artifacts into this tarball, just read them out of the current tree.
+
+## Customizing the Namespace and Service Account
+
+* `--namespace ` - set `` to the namespace in which the tests should be run.
+* `--service-account ` - set `` to the name of the Kubernetes service account to
+use in the namespace specified by the `--namespace`. The service account is expected to have permissions to get, list, watch,
+and create pods. For clusters with RBAC turned on, it's important that the right permissions are granted to the service account
+in the namespace through an appropriate role and role binding. A reference RBAC configuration is provided in `dev/spark-rbac.yaml`.
diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
new file mode 100755
index 0000000000000..ea893fa39eede
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+TEST_ROOT_DIR=$(git rev-parse --show-toplevel)/resource-managers/kubernetes/integration-tests
+
+cd "${TEST_ROOT_DIR}"
+
+DEPLOY_MODE="minikube"
+IMAGE_REPO="docker.io/kubespark"
+SPARK_TGZ="N/A"
+IMAGE_TAG="N/A"
+SPARK_MASTER=
+NAMESPACE=
+SERVICE_ACCOUNT=
+
+# Parse arguments
+while (( "$#" )); do
+ case $1 in
+ --image-repo)
+ IMAGE_REPO="$2"
+ shift
+ ;;
+ --image-tag)
+ IMAGE_TAG="$2"
+ shift
+ ;;
+ --deploy-mode)
+ DEPLOY_MODE="$2"
+ shift
+ ;;
+ --spark-tgz)
+ SPARK_TGZ="$2"
+ shift
+ ;;
+ --spark-master)
+ SPARK_MASTER="$2"
+ shift
+ ;;
+ --namespace)
+ NAMESPACE="$2"
+ shift
+ ;;
+ --service-account)
+ SERVICE_ACCOUNT="$2"
+ shift
+ ;;
+ *)
+ break
+ ;;
+ esac
+ shift
+done
+
+cd $TEST_ROOT_DIR
+
+properties=(
+ -Dspark.kubernetes.test.sparkTgz=$SPARK_TGZ \
+ -Dspark.kubernetes.test.imageTag=$IMAGE_TAG \
+ -Dspark.kubernetes.test.imageRepo=$IMAGE_REPO \
+ -Dspark.kubernetes.test.deployMode=$DEPLOY_MODE
+)
+
+if [ -n $NAMESPACE ];
+then
+ properties=( ${properties[@]} -Dspark.kubernetes.test.namespace=$NAMESPACE )
+fi
+
+if [ -n $SERVICE_ACCOUNT ];
+then
+ properties=( ${properties[@]} -Dspark.kubernetes.test.serviceAccountName=$SERVICE_ACCOUNT )
+fi
+
+if [ -n $SPARK_MASTER ];
+then
+ properties=( ${properties[@]} -Dspark.kubernetes.test.master=$SPARK_MASTER )
+fi
+
+../../../build/mvn integration-test ${properties[@]}
diff --git a/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml
new file mode 100644
index 0000000000000..a4c242f2f2645
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: v1
+kind: Namespace
+metadata:
+ name: spark
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: spark-sa
+ namespace: spark
+---
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRole
+metadata:
+ name: spark-role
+rules:
+- apiGroups:
+ - ""
+ resources:
+ - "pods"
+ verbs:
+ - "*"
+---
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+ name: spark-role-binding
+subjects:
+- kind: ServiceAccount
+ name: spark-sa
+ namespace: spark
+roleRef:
+ kind: ClusterRole
+ name: spark-role
+ apiGroup: rbac.authorization.k8s.io
\ No newline at end of file
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
new file mode 100644
index 0000000000000..520bda89e034d
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -0,0 +1,155 @@
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent_2.11
+ 2.4.0-SNAPSHOT
+ ../../../pom.xml
+
+
+ spark-kubernetes-integration-tests_2.11
+ spark-kubernetes-integration-tests
+
+ 1.3.0
+ 1.4.0
+
+ 3.0.0
+ 3.2.2
+ 1.0
+ kubernetes-integration-tests
+ ${project.build.directory}/spark-dist-unpacked
+ N/A
+ ${project.build.directory}/imageTag.txt
+ minikube
+ docker.io/kubespark
+
+
+ jar
+ Spark Project Kubernetes Integration Tests
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+ test-jar
+ test
+
+
+ io.fabric8
+ kubernetes-client
+ ${kubernetes-client.version}
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ ${exec-maven-plugin.version}
+
+
+ setup-integration-test-env
+ pre-integration-test
+
+ exec
+
+
+ scripts/setup-integration-test-env.sh
+
+ --unpacked-spark-tgz
+ ${spark.kubernetes.test.unpackSparkDir}
+
+ --image-repo
+ ${spark.kubernetes.test.imageRepo}
+
+ --image-tag
+ ${spark.kubernetes.test.imageTag}
+
+ --image-tag-output-file
+ ${spark.kubernetes.test.imageTagFile}
+
+ --deploy-mode
+ ${spark.kubernetes.test.deployMode}
+
+ --spark-tgz
+ ${spark.kubernetes.test.sparkTgz}
+
+
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ ${scalatest-maven-plugin.version}
+
+ ${project.build.directory}/surefire-reports
+ .
+ SparkTestSuite.txt
+ -ea -Xmx3g -XX:ReservedCodeCacheSize=512m ${extraScalaTestArgs}
+
+
+ file:src/test/resources/log4j.properties
+ true
+ ${spark.kubernetes.test.imageTagFile}
+ ${spark.kubernetes.test.unpackSparkDir}
+ ${spark.kubernetes.test.imageRepo}
+ ${spark.kubernetes.test.deployMode}
+ ${spark.kubernetes.test.master}
+ ${spark.kubernetes.test.namespace}
+ ${spark.kubernetes.test.serviceAccountName}
+
+ ${test.exclude.tags}
+
+
+
+ test
+
+ test
+
+
+
+ (?<!Suite)
+
+
+
+ integration-test
+ integration-test
+
+ test
+
+
+
+
+
+
+
+
+
diff --git a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
new file mode 100755
index 0000000000000..ccfb8e767c529
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+TEST_ROOT_DIR=$(git rev-parse --show-toplevel)
+UNPACKED_SPARK_TGZ="$TEST_ROOT_DIR/target/spark-dist-unpacked"
+IMAGE_TAG_OUTPUT_FILE="$TEST_ROOT_DIR/target/image-tag.txt"
+DEPLOY_MODE="minikube"
+IMAGE_REPO="docker.io/kubespark"
+IMAGE_TAG="N/A"
+SPARK_TGZ="N/A"
+
+# Parse arguments
+while (( "$#" )); do
+ case $1 in
+ --unpacked-spark-tgz)
+ UNPACKED_SPARK_TGZ="$2"
+ shift
+ ;;
+ --image-repo)
+ IMAGE_REPO="$2"
+ shift
+ ;;
+ --image-tag)
+ IMAGE_TAG="$2"
+ shift
+ ;;
+ --image-tag-output-file)
+ IMAGE_TAG_OUTPUT_FILE="$2"
+ shift
+ ;;
+ --deploy-mode)
+ DEPLOY_MODE="$2"
+ shift
+ ;;
+ --spark-tgz)
+ SPARK_TGZ="$2"
+ shift
+ ;;
+ *)
+ break
+ ;;
+ esac
+ shift
+done
+
+if [[ $SPARK_TGZ == "N/A" ]];
+then
+ echo "Must specify a Spark tarball to build Docker images against with --spark-tgz." && exit 1;
+fi
+
+rm -rf $UNPACKED_SPARK_TGZ
+mkdir -p $UNPACKED_SPARK_TGZ
+tar -xzvf $SPARK_TGZ --strip-components=1 -C $UNPACKED_SPARK_TGZ;
+
+if [[ $IMAGE_TAG == "N/A" ]];
+then
+ IMAGE_TAG=$(uuidgen);
+ cd $UNPACKED_SPARK_TGZ
+ if [[ $DEPLOY_MODE == cloud ]] ;
+ then
+ $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t $IMAGE_TAG build
+ if [[ $IMAGE_REPO == gcr.io* ]] ;
+ then
+ gcloud docker -- push $IMAGE_REPO/spark:$IMAGE_TAG
+ else
+ $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t $IMAGE_TAG push
+ fi
+ else
+ # -m option for minikube.
+ $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -m -r $IMAGE_REPO -t $IMAGE_TAG build
+ fi
+ cd -
+fi
+
+rm -f $IMAGE_TAG_OUTPUT_FILE
+echo -n $IMAGE_TAG > $IMAGE_TAG_OUTPUT_FILE
diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties b/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000000000..866126bc3c1c2
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/integration-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/integration-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
+
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark_project.jetty=WARN
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
new file mode 100644
index 0000000000000..65c513cf241a4
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.UUID
+import java.util.regex.Pattern
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.PatternFilenameFilter
+import io.fabric8.kubernetes.api.model.{Container, Pod}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Seconds, Span}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}
+import org.apache.spark.deploy.k8s.integrationtest.config._
+
+private[spark] class KubernetesSuite extends SparkFunSuite
+ with BeforeAndAfterAll with BeforeAndAfter {
+
+ import KubernetesSuite._
+
+ private var testBackend: IntegrationTestBackend = _
+ private var sparkHomeDir: Path = _
+ private var kubernetesTestComponents: KubernetesTestComponents = _
+ private var sparkAppConf: SparkAppConf = _
+ private var image: String = _
+ private var containerLocalSparkDistroExamplesJar: String = _
+ private var appLocator: String = _
+ private var driverPodName: String = _
+
+ override def beforeAll(): Unit = {
+ // The scalatest-maven-plugin gives system properties that are referenced but not set null
+ // values. We need to remove the null-value properties before initializing the test backend.
+ val nullValueProperties = System.getProperties.asScala
+ .filter(entry => entry._2.equals("null"))
+ .map(entry => entry._1.toString)
+ nullValueProperties.foreach { key =>
+ System.clearProperty(key)
+ }
+
+ val sparkDirProp = System.getProperty("spark.kubernetes.test.unpackSparkDir")
+ require(sparkDirProp != null, "Spark home directory must be provided in system properties.")
+ sparkHomeDir = Paths.get(sparkDirProp)
+ require(sparkHomeDir.toFile.isDirectory,
+ s"No directory found for spark home specified at $sparkHomeDir.")
+ val imageTag = getTestImageTag
+ val imageRepo = getTestImageRepo
+ image = s"$imageRepo/spark:$imageTag"
+
+ val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
+ .toFile
+ .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
+ containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" +
+ s"${sparkDistroExamplesJarFile.getName}"
+ testBackend = IntegrationTestBackendFactory.getTestBackend
+ testBackend.initialize()
+ kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
+ }
+
+ override def afterAll(): Unit = {
+ testBackend.cleanUp()
+ }
+
+ before {
+ appLocator = UUID.randomUUID().toString.replaceAll("-", "")
+ driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "")
+ sparkAppConf = kubernetesTestComponents.newSparkAppConf()
+ .set("spark.kubernetes.container.image", image)
+ .set("spark.kubernetes.driver.pod.name", driverPodName)
+ .set("spark.kubernetes.driver.label.spark-app-locator", appLocator)
+ .set("spark.kubernetes.executor.label.spark-app-locator", appLocator)
+ if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
+ kubernetesTestComponents.createNamespace()
+ }
+ }
+
+ after {
+ if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
+ kubernetesTestComponents.deleteNamespace()
+ }
+ deleteDriverPod()
+ }
+
+ test("Run SparkPi with no resources") {
+ runSparkPiAndVerifyCompletion()
+ }
+
+ test("Run SparkPi with a very long application name.") {
+ sparkAppConf.set("spark.app.name", "long" * 40)
+ runSparkPiAndVerifyCompletion()
+ }
+
+ test("Run SparkPi with a master URL without a scheme.") {
+ val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
+ val k8sMasterUrl = if (url.getPort < 0) {
+ s"k8s://${url.getHost}"
+ } else {
+ s"k8s://${url.getHost}:${url.getPort}"
+ }
+ sparkAppConf.set("spark.master", k8sMasterUrl)
+ runSparkPiAndVerifyCompletion()
+ }
+
+ test("Run SparkPi with an argument.") {
+ runSparkPiAndVerifyCompletion(appArgs = Array("5"))
+ }
+
+ test("Run SparkPi with custom labels, annotations, and environment variables.") {
+ sparkAppConf
+ .set("spark.kubernetes.driver.label.label1", "label1-value")
+ .set("spark.kubernetes.driver.label.label2", "label2-value")
+ .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value")
+ .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value")
+ .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
+ .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
+ .set("spark.kubernetes.executor.label.label1", "label1-value")
+ .set("spark.kubernetes.executor.label.label2", "label2-value")
+ .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value")
+ .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value")
+ .set("spark.executorEnv.ENV1", "VALUE1")
+ .set("spark.executorEnv.ENV2", "VALUE2")
+
+ runSparkPiAndVerifyCompletion(
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkCustomSettings(driverPod)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ checkCustomSettings(executorPod)
+ })
+ }
+
+ // TODO(ssuchter): Enable the below after debugging
+ // test("Run PageRank using remote data file") {
+ // sparkAppConf
+ // .set("spark.kubernetes.mountDependencies.filesDownloadDir",
+ // CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
+ // .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
+ // runSparkPageRankAndVerifyCompletion(
+ // appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
+ // }
+
+ private def runSparkPiAndVerifyCompletion(
+ appResource: String = containerLocalSparkDistroExamplesJar,
+ driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+ executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+ appArgs: Array[String] = Array.empty[String],
+ appLocator: String = appLocator): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ appResource,
+ SPARK_PI_MAIN_CLASS,
+ Seq("Pi is roughly 3"),
+ appArgs,
+ driverPodChecker,
+ executorPodChecker,
+ appLocator)
+ }
+
+ private def runSparkPageRankAndVerifyCompletion(
+ appResource: String = containerLocalSparkDistroExamplesJar,
+ driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+ executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+ appArgs: Array[String],
+ appLocator: String = appLocator): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ appResource,
+ SPARK_PAGE_RANK_MAIN_CLASS,
+ Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
+ appArgs,
+ driverPodChecker,
+ executorPodChecker,
+ appLocator)
+ }
+
+ private def runSparkApplicationAndVerifyCompletion(
+ appResource: String,
+ mainClass: String,
+ expectedLogOnCompletion: Seq[String],
+ appArgs: Array[String],
+ driverPodChecker: Pod => Unit,
+ executorPodChecker: Pod => Unit,
+ appLocator: String): Unit = {
+ val appArguments = SparkAppArguments(
+ mainAppResource = appResource,
+ mainClass = mainClass,
+ appArgs = appArgs)
+ SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
+
+ val driverPod = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "driver")
+ .list()
+ .getItems
+ .get(0)
+ driverPodChecker(driverPod)
+
+ val executorPods = kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withLabel("spark-app-locator", appLocator)
+ .withLabel("spark-role", "executor")
+ .list()
+ .getItems
+ executorPods.asScala.foreach { pod =>
+ executorPodChecker(pod)
+ }
+
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ expectedLogOnCompletion.foreach { e =>
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPod.getMetadata.getName)
+ .getLog
+ .contains(e), "The application did not complete.")
+ }
+ }
+ }
+
+ private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
+ assert(driverPod.getMetadata.getName === driverPodName)
+ assert(driverPod.getSpec.getContainers.get(0).getImage === image)
+ assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
+ }
+
+ private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
+ assert(executorPod.getSpec.getContainers.get(0).getImage === image)
+ assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+ }
+
+ private def checkCustomSettings(pod: Pod): Unit = {
+ assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
+ assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
+ assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value")
+ assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value")
+
+ val container = pod.getSpec.getContainers.get(0)
+ val envVars = container
+ .getEnv
+ .asScala
+ .map { env =>
+ (env.getName, env.getValue)
+ }
+ .toMap
+ assert(envVars("ENV1") === "VALUE1")
+ assert(envVars("ENV2") === "VALUE2")
+ }
+
+ private def deleteDriverPod(): Unit = {
+ kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete()
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ assert(kubernetesTestComponents.kubernetesClient
+ .pods()
+ .withName(driverPodName)
+ .get() == null)
+ }
+ }
+}
+
+private[spark] object KubernetesSuite {
+
+ val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
+ val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
+ val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+ val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
+
+ // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
+
+ // val REMOTE_PAGE_RANK_DATA_FILE =
+ // "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
+ // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
+ // s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
+
+ // case object ShuffleNotReadyException extends Exception
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
new file mode 100644
index 0000000000000..48727142dd052
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.nio.file.{Path, Paths}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.internal.Logging
+
+private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
+
+ val namespaceOption = Option(System.getProperty("spark.kubernetes.test.namespace"))
+ val hasUserSpecifiedNamespace = namespaceOption.isDefined
+ val namespace = namespaceOption.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
+ private val serviceAccountName =
+ Option(System.getProperty("spark.kubernetes.test.serviceAccountName"))
+ .getOrElse("default")
+ val kubernetesClient = defaultClient.inNamespace(namespace)
+ val clientConfig = kubernetesClient.getConfiguration
+
+ def createNamespace(): Unit = {
+ defaultClient.namespaces.createNew()
+ .withNewMetadata()
+ .withName(namespace)
+ .endMetadata()
+ .done()
+ }
+
+ def deleteNamespace(): Unit = {
+ defaultClient.namespaces.withName(namespace).delete()
+ Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
+ val namespaceList = defaultClient
+ .namespaces()
+ .list()
+ .getItems
+ .asScala
+ require(!namespaceList.exists(_.getMetadata.getName == namespace))
+ }
+ }
+
+ def newSparkAppConf(): SparkAppConf = {
+ new SparkAppConf()
+ .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}")
+ .set("spark.kubernetes.namespace", namespace)
+ .set("spark.executor.memory", "500m")
+ .set("spark.executor.cores", "1")
+ .set("spark.executors.instances", "1")
+ .set("spark.app.name", "spark-test-app")
+ .set("spark.ui.enabled", "true")
+ .set("spark.testing", "false")
+ .set("spark.kubernetes.submission.waitAppCompletion", "false")
+ .set("spark.kubernetes.authenticate.driver.serviceAccountName", serviceAccountName)
+ }
+}
+
+private[spark] class SparkAppConf {
+
+ private val map = mutable.Map[String, String]()
+
+ def set(key: String, value: String): SparkAppConf = {
+ map.put(key, value)
+ this
+ }
+
+ def get(key: String): String = map.getOrElse(key, "")
+
+ def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(","))
+
+ override def toString: String = map.toString
+
+ def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}"))
+}
+
+private[spark] case class SparkAppArguments(
+ mainAppResource: String,
+ mainClass: String,
+ appArgs: Array[String])
+
+private[spark] object SparkAppLauncher extends Logging {
+
+ def launch(
+ appArguments: SparkAppArguments,
+ appConf: SparkAppConf,
+ timeoutSecs: Int,
+ sparkHomeDir: Path): Unit = {
+ val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
+ logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
+ val appArgsArray =
+ if (appArguments.appArgs.length > 0) Array(appArguments.appArgs.mkString(" "))
+ else Array[String]()
+ val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath,
+ "--deploy-mode", "cluster",
+ "--class", appArguments.mainClass,
+ "--master", appConf.get("spark.master")
+ ) ++ appConf.toStringArray :+
+ appArguments.mainAppResource) ++
+ appArgsArray
+ ProcessUtils.executeProcess(commandLine, timeoutSecs)
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
new file mode 100644
index 0000000000000..d8f3a6cec05c3
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+import org.apache.spark.internal.Logging
+
+object ProcessUtils extends Logging {
+ /**
+ * executeProcess is used to run a command and return the output if it
+ * completes within timeout seconds.
+ */
+ def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = {
+ val pb = new ProcessBuilder().command(fullCommand: _*)
+ pb.redirectErrorStream(true)
+ val proc = pb.start()
+ val outputLines = new ArrayBuffer[String]
+ Utils.tryWithResource(proc.getInputStream)(
+ Source.fromInputStream(_, "UTF-8").getLines().foreach { line =>
+ logInfo(line)
+ outputLines += line
+ })
+ assert(proc.waitFor(timeout, TimeUnit.SECONDS),
+ s"Timed out while executing ${fullCommand.mkString(" ")}")
+ assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}")
+ outputLines
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
new file mode 100644
index 0000000000000..f1fd6dc19ce54
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.SettableFuture
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.internal.readiness.Readiness
+
+private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] {
+
+ private val signal = SettableFuture.create[Boolean]
+
+ override def eventReceived(action: Action, resource: T): Unit = {
+ if ((action == Action.MODIFIED || action == Action.ADDED) &&
+ Readiness.isReady(resource)) {
+ signal.set(true)
+ }
+ }
+
+ override def onClose(cause: KubernetesClientException): Unit = {}
+
+ def waitUntilReady(): Boolean = signal.get(60, TimeUnit.SECONDS)
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
new file mode 100644
index 0000000000000..663f8b6523ac8
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.Closeable
+import java.net.URI
+
+import org.apache.spark.internal.Logging
+
+object Utils extends Logging {
+
+ def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
+ val resource = createResource
+ try f.apply(resource) finally resource.close()
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
new file mode 100644
index 0000000000000..284712c6d250e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest.backend
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
+
+private[spark] trait IntegrationTestBackend {
+ def initialize(): Unit
+ def getKubernetesClient: DefaultKubernetesClient
+ def cleanUp(): Unit = {}
+}
+
+private[spark] object IntegrationTestBackendFactory {
+ val deployModeConfigKey = "spark.kubernetes.test.deployMode"
+
+ def getTestBackend: IntegrationTestBackend = {
+ val deployMode = Option(System.getProperty(deployModeConfigKey))
+ .getOrElse("minikube")
+ if (deployMode == "minikube") {
+ MinikubeTestBackend
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid " + deployModeConfigKey + ": " + deployMode)
+ }
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
new file mode 100644
index 0000000000000..6494cbc18f33e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
+
+import java.io.File
+import java.nio.file.Paths
+
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+
+import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
+import org.apache.spark.internal.Logging
+
+// TODO support windows
+private[spark] object Minikube extends Logging {
+
+ private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
+
+ def getMinikubeIp: String = {
+ val outputs = executeMinikube("ip")
+ .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
+ assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
+ outputs.head
+ }
+
+ def getMinikubeStatus: MinikubeStatus.Value = {
+ val statusString = executeMinikube("status")
+ .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
+ .head
+ .replaceFirst("minikubeVM: ", "")
+ .replaceFirst("minikube: ", "")
+ MinikubeStatus.unapply(statusString)
+ .getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
+ }
+
+ def getKubernetesClient: DefaultKubernetesClient = {
+ val kubernetesMaster = s"https://${getMinikubeIp}:8443"
+ val userHome = System.getProperty("user.home")
+ val kubernetesConf = new ConfigBuilder()
+ .withApiVersion("v1")
+ .withMasterUrl(kubernetesMaster)
+ .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath)
+ .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath)
+ .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath)
+ .build()
+ new DefaultKubernetesClient(kubernetesConf)
+ }
+
+ private def executeMinikube(action: String, args: String*): Seq[String] = {
+ ProcessUtils.executeProcess(
+ Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
+ }
+}
+
+private[spark] object MinikubeStatus extends Enumeration {
+
+ // The following states are listed according to
+ // https://github.com/docker/machine/blob/master/libmachine/state/state.go.
+ val STARTING = status("Starting")
+ val RUNNING = status("Running")
+ val PAUSED = status("Paused")
+ val STOPPING = status("Stopping")
+ val STOPPED = status("Stopped")
+ val ERROR = status("Error")
+ val TIMEOUT = status("Timeout")
+ val SAVED = status("Saved")
+ val NONE = status("")
+
+ def status(value: String): Value = new Val(nextId, value)
+ def unapply(s: String): Option[Value] = values.find(s == _.toString)
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
new file mode 100644
index 0000000000000..cb9324179d70e
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
+
+private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
+
+ private var defaultClient: DefaultKubernetesClient = _
+
+ override def initialize(): Unit = {
+ val minikubeStatus = Minikube.getMinikubeStatus
+ require(minikubeStatus == MinikubeStatus.RUNNING,
+ s"Minikube must be running to use the Minikube backend for integration tests." +
+ s" Current status is: $minikubeStatus.")
+ defaultClient = Minikube.getKubernetesClient
+ }
+
+ override def cleanUp(): Unit = {
+ super.cleanUp()
+ }
+
+ override def getKubernetesClient: DefaultKubernetesClient = {
+ defaultClient
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
new file mode 100644
index 0000000000000..a81ef455c6766
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+package object config {
+ def getTestImageTag: String = {
+ val imageTagFileProp = System.getProperty("spark.kubernetes.test.imageTagFile")
+ require(imageTagFileProp != null, "Image tag file must be provided in system properties.")
+ val imageTagFile = new File(imageTagFileProp)
+ require(imageTagFile.isFile, s"No file found for image tag at ${imageTagFile.getAbsolutePath}.")
+ Files.toString(imageTagFile, Charsets.UTF_8).trim
+ }
+
+ def getTestImageRepo: String = {
+ val imageRepo = System.getProperty("spark.kubernetes.test.imageRepo")
+ require(imageRepo != null, "Image repo must be provided in system properties.")
+ imageRepo
+ }
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
new file mode 100644
index 0000000000000..0807a68cd823c
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+package object constants {
+ val MINIKUBE_TEST_BACKEND = "minikube"
+ val GCE_TEST_BACKEND = "gce"
+}