diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala index 3199a8c385f95..f402d240bfc33 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.internal.Logging /** diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index e90c190b60224..bd5ff7a005d46 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -25,22 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} private[spark] class KubernetesSuite extends SparkFunSuite { - private var kubernetesTestClient: KubernetesTestClient = _ + private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend() override def beforeAll(): Unit = { - kubernetesTestClient = new KubernetesTestClient() + testBackend.initialize() } override def afterAll(): Unit = { - kubernetesTestClient.cleanUp() + testBackend.cleanUp() } override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = { Vector( - new KubernetesV1Suite(kubernetesTestClient), - new KubernetesV2Suite(kubernetesTestClient)) + new KubernetesV1Suite(testBackend), + new KubernetesV2Suite(testBackend)) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 065f8033dd913..d536000f84c5a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { @@ -109,45 +109,4 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl field.setAccessible(false) } } -} - -object KubernetesTestBackend extends Enumeration { - val SingleNode, MultiNode = Value -} - -/** - * Creates and holds a Kubernetes client for executing tests. - * When master and driver/executor images are supplied, we return a client - * for that cluster. By default, we return a Minikube client - */ - -class KubernetesTestClient { - var defaultClient: DefaultKubernetesClient = _ - var testBackend: KubernetesTestBackend.Value = _ - - Option(System.getProperty("spark.kubernetes.test.master")).map { - master => - var k8ConfBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(resolveK8sMaster(master)) - val k8ClientConfig = k8ConfBuilder.build - defaultClient = new DefaultKubernetesClient(k8ClientConfig) - testBackend = KubernetesTestBackend.MultiNode - }.getOrElse { - Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() - defaultClient = Minikube.getKubernetesClient - testBackend = KubernetesTestBackend.SingleNode - } - - def getClient(): DefaultKubernetesClient = { - defaultClient - } - - def cleanUp(): Unit = { - if (testBackend == KubernetesTestBackend.SingleNode - && !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() - } - } -} +} \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index 63031e211b33f..4cbd074547915 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -32,21 +32,23 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND} import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @DoNotDiscover -private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClient) +private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) extends SparkFunSuite with BeforeAndAfter { private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient()) + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) kubernetesTestComponents.createNamespace() } @@ -170,7 +172,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Enable SSL on the driver submit server") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( Minikube.getMinikubeIp, @@ -192,7 +194,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Enable SSL on the driver submit server using PEM files") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") @@ -207,7 +209,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Added files should exist on the driver.") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) sparkConf.setAppName("spark-file-existence-test") @@ -265,7 +267,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Use external URI provider") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val externalUriProviderWatch = new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) @@ -298,7 +300,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Mount the Kubernetes credentials onto the driver pod") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, kubernetesTestComponents.clientConfig.getCaCertFile) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 4fca10bf30138..8fa7cbd52ee83 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -21,14 +21,16 @@ import java.util.UUID import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually -import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl} @DoNotDiscover -private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClient) +private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) extends SparkFunSuite with BeforeAndAfter { private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") @@ -37,7 +39,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient()) + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) resourceStagingServerLauncher = new ResourceStagingServerLauncher( kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } @@ -55,14 +57,14 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien } test("Use submission v2.") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) launchStagingServer(SSLOptions()) runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = Minikube.getMinikubeIp, @@ -86,7 +88,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien } test("Use container-local resources without the resource staging server") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, @@ -95,7 +97,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien } private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( resourceStagingServerSslOptions) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala new file mode 100644 index 0000000000000..1ef096be4af02 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.constants.GCE_TEST_BACKEND + +private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(resolveK8sMaster(master)) + defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build) + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def name(): String = GCE_TEST_BACKEND +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala new file mode 100644 index 0000000000000..44030adb47207 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.integrationtest.backend + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE.GCETestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend} +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder + +private[spark] trait IntegrationTestBackend { + def name(): String + def initialize(): Unit + def getKubernetesClient(): DefaultKubernetesClient + def cleanUp(): Unit = {} +} + +private[spark] object IntegrationTestBackendFactory { + def getTestBackend(): IntegrationTestBackend = { + Option(System.getProperty("spark.kubernetes.test.master")).map { + master => + new GCETestBackend(master) + }.getOrElse { + new MinikubeTestBackend() + } + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala similarity index 98% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala index 69b3ff2784796..7c4b344e8f72b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.integrationtest.minikube +package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube import java.nio.file.Paths diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala new file mode 100644 index 0000000000000..6e0049b813719 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder + +private[spark] class MinikubeTestBackend extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def cleanUp(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + override def name(): String = MINIKUBE_TEST_BACKEND + + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala new file mode 100644 index 0000000000000..8207198b529d2 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +package object constants { + val MINIKUBE_TEST_BACKEND = "minikube" + val GCE_TEST_BACKEND = "gce" +} \ No newline at end of file