Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.internal.Logging

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.{Minutes, Seconds, Span}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}

private[spark] class KubernetesSuite extends SparkFunSuite {
private var kubernetesTestClient: KubernetesTestClient = _
private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend()

override def beforeAll(): Unit = {
kubernetesTestClient = new KubernetesTestClient()
testBackend.initialize()
}

override def afterAll(): Unit = {
kubernetesTestClient.cleanUp()
testBackend.cleanUp()
}

override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = {
Vector(
new KubernetesV1Suite(kubernetesTestClient),
new KubernetesV2Suite(kubernetesTestClient))
new KubernetesV1Suite(testBackend),
new KubernetesV2Suite(testBackend))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil

private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {
Expand Down Expand Up @@ -109,45 +109,4 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl
field.setAccessible(false)
}
}
}

object KubernetesTestBackend extends Enumeration {
val SingleNode, MultiNode = Value
}

/**
* Creates and holds a Kubernetes client for executing tests.
* When master and driver/executor images are supplied, we return a client
* for that cluster. By default, we return a Minikube client
*/

class KubernetesTestClient {
var defaultClient: DefaultKubernetesClient = _
var testBackend: KubernetesTestBackend.Value = _

Option(System.getProperty("spark.kubernetes.test.master")).map {
master =>
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(resolveK8sMaster(master))
val k8ClientConfig = k8ConfBuilder.build
defaultClient = new DefaultKubernetesClient(k8ClientConfig)
testBackend = KubernetesTestBackend.MultiNode
}.getOrElse {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
defaultClient = Minikube.getKubernetesClient
testBackend = KubernetesTestBackend.SingleNode
}

def getClient(): DefaultKubernetesClient = {
defaultClient
}

def cleanUp(): Unit = {
if (testBackend == KubernetesTestBackend.SingleNode
&& !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,23 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND}
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager}
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils

@DoNotDiscover
private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClient)
private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient())
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
kubernetesTestComponents.createNamespace()
}

Expand Down Expand Up @@ -170,7 +172,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Enable SSL on the driver submit server") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair(
Minikube.getMinikubeIp,
Expand All @@ -192,7 +194,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Enable SSL on the driver submit server using PEM files") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}")
Expand All @@ -207,7 +209,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Added files should exist on the driver.") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath)
sparkConf.setAppName("spark-file-existence-test")
Expand Down Expand Up @@ -265,7 +267,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Use external URI provider") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val externalUriProviderWatch =
new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient)
Expand Down Expand Up @@ -298,7 +300,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien
}

test("Mount the Kubernetes credentials onto the driver pod") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE,
kubernetesTestComponents.clientConfig.getCaCertFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import java.util.UUID
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite}
import org.apache.spark._
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl}

@DoNotDiscover
private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClient)
private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
Expand All @@ -37,7 +39,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient())
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
}
Expand All @@ -55,14 +57,14 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
}

test("Use submission v2.") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

launchStagingServer(SSLOptions())
runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
}

test("Enable SSL on the submission server") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
ipAddress = Minikube.getMinikubeIp,
Expand All @@ -86,7 +88,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
}

test("Use container-local resources without the resource staging server") {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.setJars(Seq(
KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE,
Expand All @@ -95,7 +97,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien
}

private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = {
assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode)
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
resourceStagingServerSslOptions)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}

import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.constants.GCE_TEST_BACKEND

private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _

override def initialize(): Unit = {
var k8ConfBuilder = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(resolveK8sMaster(master))
defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build)
}

override def getKubernetesClient(): DefaultKubernetesClient = {
defaultClient
}

override def name(): String = GCE_TEST_BACKEND
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.kubernetes.integrationtest.backend

import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE.GCETestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend}
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder

private[spark] trait IntegrationTestBackend {
def name(): String
def initialize(): Unit
def getKubernetesClient(): DefaultKubernetesClient
def cleanUp(): Unit = {}
}

private[spark] object IntegrationTestBackendFactory {
def getTestBackend(): IntegrationTestBackend = {
Option(System.getProperty("spark.kubernetes.test.master")).map {
master =>
new GCETestBackend(master)
}.getOrElse {
new MinikubeTestBackend()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.minikube
package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube

import java.nio.file.Paths

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube

import io.fabric8.kubernetes.client.DefaultKubernetesClient

import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder

private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
private var defaultClient: DefaultKubernetesClient = _

override def initialize(): Unit = {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
defaultClient = Minikube.getKubernetesClient
}

override def getKubernetesClient(): DefaultKubernetesClient = {
defaultClient
}

override def cleanUp(): Unit = {
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
}
}

override def name(): String = MINIKUBE_TEST_BACKEND


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest

package object constants {
val MINIKUBE_TEST_BACKEND = "minikube"
val GCE_TEST_BACKEND = "gce"
}