Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion e2e/runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ $root/spark/build/mvn clean -Ddownload.plugin.skip=true integration-test \
-Dspark-distro-tgz=$root/spark/*.tgz \
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://$MASTER \
-Dspark.docker.test.driverImage=$IMAGE_REPO/spark-driver:$tag \
-Dspark.docker.test.executorImage=$IMAGE_REPO/spark-executor:$tag" || :
-Dspark.docker.test.executorImage=$IMAGE_REPO/spark-executor:$tag \
-Dspark.docker.test.initContainerImage=$IMAGE_REPO/spark-init:$tag" || :

echo "TEST SUITE FINISHED"
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._

import com.google.common.io.PatternFilenameFilter
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.{Container, Pod}
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
Expand Down Expand Up @@ -52,6 +52,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
before {
sparkAppConf = kubernetesTestComponents.newSparkAppConf()
.set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL)
.set("spark.kubernetes.executor.label.spark-app-locator", APP_LOCATOR_LABEL)
kubernetesTestComponents.createNamespace()
}

Expand All @@ -70,10 +71,25 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit

test("Run SparkPi with a master URL without a scheme.") {
val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
sparkAppConf.set("spark.master", s"k8s://${url.getHost}:${url.getPort}")
val k8sMasterUrl = if (url.getPort < 0) {
s"k8s://${url.getHost}"
} else {
s"k8s://${url.getHost}:${url.getPort}"
}
sparkAppConf.set("spark.master", k8sMasterUrl)
runSparkPiAndVerifyCompletion()
}

test("Run SparkPi with an argument.") {
runSparkPiAndVerifyCompletion(appArgs = Array("5"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed.

}

test("Run SparkPi using the remote example jar.") {
sparkAppConf.set("spark.kubernetes.initContainer.image",
System.getProperty("spark.docker.test.initContainerImage", "spark-init:latest"))
runSparkPiAndVerifyCompletion(appResource = REMOTE_EXAMPLES_JAR_URI)
}

test("Run SparkPi with custom driver pod name, labels, annotations, and environment variables.") {
sparkAppConf
.set("spark.kubernetes.driver.pod.name", "spark-integration-spark-pi")
Expand All @@ -83,56 +99,109 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
.set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value")
.set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
.set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
runSparkPiAndVerifyCompletion(driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
assert(driverPod.getMetadata.getName === "spark-integration-spark-pi")

assert(driverPod.getMetadata.getLabels.get("label1") === "label1-value")
assert(driverPod.getMetadata.getLabels.get("label2") === "label2-value")
assert(driverPod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value")
assert(driverPod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value")

val driverContainer = driverPod.getSpec.getContainers.get(0)
val envVars = driverContainer
.getEnv
.asScala
.map { env =>
(env.getName, env.getValue)
}
.toMap
assert(envVars("ENV1") === "VALUE1")
assert(envVars("ENV2") === "VALUE2")
})
.set("spark.kubernetes.executor.label.label1", "label1-value")
.set("spark.kubernetes.executor.label.label2", "label2-value")
.set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value")
.set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value")
.set("spark.executorEnv.ENV1", "VALUE1")
.set("spark.executorEnv.ENV2", "VALUE2")

runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
assert(driverPod.getMetadata.getName === "spark-integration-spark-pi")
checkCustomSettings(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkCustomSettings(executorPod)
})
}

test("Run SparkPi with a test secret mounted into the driver and executor pods") {
createTestSecret()
sparkAppConf
.set(s"spark.kubernetes.driver.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH)
.set(s"spark.kubernetes.executor.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH)
runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkTestSecret(driverPod)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkTestSecret(executorPod)
})
}

test("Run SparkPi using the remote example jar with a test secret mounted into the driver and " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test using the remote example jar? I can't find it being referenced in the test itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

"executor pods") {
sparkAppConf
.set(s"spark.kubernetes.driver.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH)
.set(s"spark.kubernetes.executor.secrets.$TEST_SECRET_NAME", TEST_SECRET_MOUNT_PATH)
sparkAppConf.set("spark.kubernetes.initContainer.image",
System.getProperty("spark.docker.test.initContainerImage", "spark-init:latest"))

createTestSecret()

runSparkPiAndVerifyCompletion(
appResource = REMOTE_EXAMPLES_JAR_URI,
driverPodChecker = (driverPod: Pod) => {
doBasicDriverPodCheck(driverPod)
checkTestSecret(driverPod, withInitContainer = true)
},
executorPodChecker = (executorPod: Pod) => {
doBasicExecutorPodCheck(executorPod)
checkTestSecret(executorPod, withInitContainer = true)
})
}

private def runSparkPiAndVerifyCompletion(
appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck): Unit = {
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
appArgs: Array[String] = Array.empty[String]): Unit = {
runSparkApplicationAndVerifyCompletion(
appResource,
SPARK_PI_MAIN_CLASS,
Seq("Pi is roughly 3"),
Array.empty[String],
driverPodChecker)
appArgs,
driverPodChecker,
executorPodChecker)
}

private def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
expectedLogOnCompletion: Seq[String],
appArgs: Array[String],
driverPodChecker: Pod => Unit): Unit = {
driverPodChecker: Pod => Unit,
executorPodChecker: Pod => Unit): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass)
mainClass = mainClass,
appArgs = appArgs)
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt)

val driverPod = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)
.withLabel("spark-role", "driver")
.list()
.getItems
.get(0)
driverPodChecker(driverPod)

val executorPods = kubernetesTestComponents.kubernetesClient
.pods()
.withLabel("spark-app-locator", APP_LOCATOR_LABEL)
.withLabel("spark-role", "executor")
.list()
.getItems
executorPods.asScala.foreach { pod =>
executorPodChecker(pod)
}

Eventually.eventually(TIMEOUT, INTERVAL) {
expectedLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
Expand All @@ -145,7 +214,64 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit
}

private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
assert(driverPod.getMetadata.getLabels.get("spark-role") === "driver")
assert(driverPod.getSpec.getContainers.get(0).getImage === "spark-driver:latest")
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
}

private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
assert(executorPod.getSpec.getContainers.get(0).getImage === "spark-executor:latest")
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
}

private def checkCustomSettings(pod: Pod): Unit = {
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value")
assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value")

val container = pod.getSpec.getContainers.get(0)
val envVars = container
.getEnv
.asScala
.map { env =>
(env.getName, env.getValue)
}
.toMap
assert(envVars("ENV1") === "VALUE1")
assert(envVars("ENV2") === "VALUE2")
}

private def createTestSecret(): Unit = {
testBackend.getKubernetesClient.secrets
.createNew()
.editOrNewMetadata()
.withName(TEST_SECRET_NAME)
.withNamespace(kubernetesTestComponents.namespace)
.endMetadata()
.addToStringData(TEST_SECRET_KEY, TEST_SECRET_VALUE)
.done()
}

private def checkTestSecret(pod: Pod, withInitContainer: Boolean = false): Unit = {
val testSecretVolume = pod.getSpec.getVolumes.asScala.filter { volume =>
volume.getName == s"$TEST_SECRET_NAME-volume"
}
assert(testSecretVolume.size === 1)
assert(testSecretVolume.head.getSecret.getSecretName === TEST_SECRET_NAME)

checkTestSecretInContainer(pod.getSpec.getContainers.get(0))

if (withInitContainer) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this option being exercised currently?

Copy link
Member Author

@liyinan926 liyinan926 Jan 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, none of the tests uses an init-container yet.

checkTestSecretInContainer(pod.getSpec.getInitContainers.get(0))
}
}

private def checkTestSecretInContainer(container: Container): Unit = {
val testSecret = container.getVolumeMounts.asScala.filter { mount =>
mount.getName == s"$TEST_SECRET_NAME-volume"
}
assert(testSecret.size === 1)
assert(testSecret.head.getMountPath === TEST_SECRET_MOUNT_PATH)
}
}

Expand All @@ -161,5 +287,13 @@ private[spark] object KubernetesSuite {
s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}"
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"

val TEST_SECRET_NAME = "test-secret"
val TEST_SECRET_KEY = "test-key"
val TEST_SECRET_VALUE = "test-data"
val TEST_SECRET_MOUNT_PATH = "/etc/secrets"

val REMOTE_EXAMPLES_JAR_URI =
"https://storage.googleapis.com/spark-k8s-integration-tests/jars/spark-examples_2.11-2.3.0.jar"

case object ShuffleNotReadyException extends Exception
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
val namespaceList = defaultClient
.namespaces()
.list()
.getItems()
.getItems
.asScala
require(!namespaceList.exists(_.getMetadata.getName == namespace))
}
Expand Down Expand Up @@ -91,7 +91,8 @@ private[spark] class SparkAppConf {

private[spark] case class SparkAppArguments(
mainAppResource: String,
mainClass: String)
mainClass: String,
appArgs: Array[String])

private[spark] object SparkAppLauncher extends Logging {

Expand All @@ -104,7 +105,9 @@ private[spark] object SparkAppLauncher extends Logging {
"--deploy-mode", "cluster",
"--class", appArguments.mainClass,
"--master", appConf.get("spark.master")
) ++ appConf.toStringArray :+ appArguments.mainAppResource
) ++ appConf.toStringArray :+
appArguments.mainAppResource :+
appArguments.appArgs.mkString(" ")
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
ProcessUtils.executeProcess(commandLine, timeoutSecs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTest
private[spark] trait IntegrationTestBackend {
def name(): String
def initialize(): Unit
def getKubernetesClient(): DefaultKubernetesClient
def getKubernetesClient: DefaultKubernetesClient
def cleanUp(): Unit = {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH
import org.apache.spark.deploy.k8s.integrationtest.Logging

private[spark] class SparkDockerImageBuilder
(private val dockerEnv: Map[String, String]) extends Logging{
(private val dockerEnv: Map[String, String]) extends Logging {

private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH
// Dockerfile paths must be relative to the build path.
private val DOCKERFILES_DIR = "kubernetes/dockerfiles/"
private val BASE_DOCKER_FILE = DOCKERFILES_DIR + "spark-base/Dockerfile"
private val DRIVER_DOCKER_FILE = DOCKERFILES_DIR + "driver/Dockerfile"
private val EXECUTOR_DOCKER_FILE = DOCKERFILES_DIR + "executor/Dockerfile"
private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile"
private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST",
Expand Down Expand Up @@ -64,6 +65,7 @@ private[spark] class SparkDockerImageBuilder
buildImage("spark-base", BASE_DOCKER_FILE)
buildImage("spark-driver", DRIVER_DOCKER_FILE)
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)
}

private def buildImage(name: String, dockerFile: String): Unit = {
Expand Down