Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ build/mvn integration-test \
-pl resource-managers/kubernetes/integration-tests -am
```

# Running against an arbitrary cluster

In order to run against any cluster, use the following:
build/mvn integration-test \
-Pkubernetes -Pkubernetes-integration-tests \
-pl resource-managers/kubernetes/integration-tests -am
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"

# Preserve the Minikube VM

The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.internal.Logging

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.{Minutes, Seconds, Span}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}

private[spark] class KubernetesSuite extends SparkFunSuite {
private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend()

override def beforeAll(): Unit = {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
testBackend.initialize()
}

override def afterAll(): Unit = {
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
}
testBackend.cleanUp()
}

override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = {
Vector(
new KubernetesV1Suite,
new KubernetesV2Suite)
new KubernetesV1Suite(testBackend),
new KubernetesV2Suite(testBackend))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,37 @@
package org.apache.spark.deploy.kubernetes.integrationtest

import java.util.UUID
import javax.net.ssl.X509TrustManager

import org.scalatest.concurrent.Eventually
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.internal.SSLUtils
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil

private[spark] class KubernetesTestComponents {
private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {

val namespace = UUID.randomUUID().toString.replaceAll("-", "")
val kubernetesClient = Minikube.getKubernetesClient.inNamespace(namespace)
val kubernetesClient = defaultClient.inNamespace(namespace)
val clientConfig = kubernetesClient.getConfiguration

def createNamespace(): Unit = {
Minikube.getKubernetesClient.namespaces.createNew()
defaultClient.namespaces.createNew()
.withNewMetadata()
.withName(namespace)
.endMetadata()
.done()
}

def deleteNamespace(): Unit = {
Minikube.getKubernetesClient.namespaces.withName(namespace).delete()
defaultClient.namespaces.withName(namespace).delete()
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
val namespaceList = Minikube.getKubernetesClient
val namespaceList = defaultClient
.namespaces()
.list()
.getItems()
Expand All @@ -53,13 +58,12 @@ private[spark] class KubernetesTestComponents {

def newSparkConf(): SparkConf = {
new SparkConf(true)
.setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443")
.set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile)
.set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile)
.set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile)
.setMaster(s"k8s://${kubernetesClient.getMasterUrl}")
.set(KUBERNETES_NAMESPACE, namespace)
.set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest")
.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver:latest"))
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor:latest"))
.setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath))
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
Expand All @@ -69,4 +73,26 @@ private[spark] class KubernetesTestComponents {
.set("spark.testing", "false")
.set(WAIT_FOR_APP_COMPLETION, false)
}
}

def getService[T: ClassTag](
serviceName: String,
namespace: String,
servicePortName: String,
servicePath: String = ""): T = synchronized {
val kubernetesMaster = s"${defaultClient.getMasterUrl}"

val url = s"${
Array[String](
s"${kubernetesClient.getMasterUrl}",
"api", "v1", "proxy",
"namespaces", namespace,
"services", serviceName).mkString("/")
}" +
s":$servicePortName$servicePath"
val userHome = System.getProperty("user.home")
val kubernetesConf = kubernetesClient.getConfiguration
val sslContext = SSLUtils.sslContext(kubernetesConf)
val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager]
HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,37 @@ package org.apache.spark.deploy.kubernetes.integrationtest

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import com.google.common.collect.ImmutableList
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
import org.scalatest.concurrent.Eventually
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND}
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager}
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils

@DoNotDiscover
private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter {
private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
kubernetesTestComponents.createNamespace()
}

Expand Down Expand Up @@ -85,7 +89,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
.get(0)
.getMetadata
.getName
Minikube.getService[SparkRestApiV1](serviceName,
kubernetesTestComponents.getService[SparkRestApiV1](serviceName,
kubernetesTestComponents.namespace, "spark-ui-port")
}

Expand Down Expand Up @@ -168,6 +172,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Enable SSL on the driver submit server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair(
Minikube.getMinikubeIp,
"changeit",
Expand All @@ -188,6 +194,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Enable SSL on the driver submit server using PEM files") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}")
sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}")
Expand All @@ -201,6 +209,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Added files should exist on the driver.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath)
sparkConf.setAppName("spark-file-existence-test")
val podCompletedFuture = SettableFuture.create[Boolean]
Expand Down Expand Up @@ -257,6 +267,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Use external URI provider") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val externalUriProviderWatch =
new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient)
Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services()
Expand Down Expand Up @@ -288,6 +300,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Mount the Kubernetes credentials onto the driver pod") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE,
kubernetesTestComponents.clientConfig.getCaCertFile)
sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ import java.util.UUID
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions}
import org.apache.spark._
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl}

@DoNotDiscover
private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter {
private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _
private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
}
Expand All @@ -54,11 +57,15 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
}

test("Use submission v2.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

launchStagingServer(SSLOptions())
runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
}

test("Enable SSL on the submission server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
ipAddress = Minikube.getMinikubeIp,
keyStorePassword = "keyStore",
Expand All @@ -81,13 +88,17 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
}

test("Use container-local resources without the resource staging server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.setJars(Seq(
KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE,
KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH))
runSparkAppAndVerifyCompletion(KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE)
}

private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
resourceStagingServerSslOptions)
val resourceStagingServerUriScheme = if (resourceStagingServerSslOptions.enabled) {
Expand All @@ -96,7 +107,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
"http"
}
sparkConf.set(RESOURCE_STAGING_SERVER_URI,
s"$resourceStagingServerUriScheme://${Minikube.getMinikubeIp}:$resourceStagingServerPort")
s"$resourceStagingServerUriScheme://" +
s"${Minikube.getMinikubeIp}:$resourceStagingServerPort")
}

private def runSparkAppAndVerifyCompletion(appResource: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest

import java.io.{BufferedReader, InputStreamReader}
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

object ProcessUtils extends Logging {
/**
* executeProcess is used to run a command and return the output if it
* completes within timeout seconds.
*/
def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = {
val pb = new ProcessBuilder().command(fullCommand: _*)
pb.redirectErrorStream(true)
val proc = pb.start()
val outputLines = new ArrayBuffer[String]

Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput =>
Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) =>
var line: String = null
do {
line = bufferedOutput.readLine()
if (line != null) {
logInfo(line)
outputLines += line
}
} while (line != null)
}
}
assert(proc.waitFor(timeout, TimeUnit.SECONDS),
s"Timed out while executing ${fullCommand.mkString(" ")}")
assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}")
outputLines.toSeq
}
}
Loading