diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index d70c38fdc64d5..fd1ad29eb795d 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -61,6 +61,14 @@ build/mvn integration-test \ -pl resource-managers/kubernetes/integration-tests -am ``` +# Running against an arbitrary cluster + +In order to run against any cluster, use the following: +build/mvn integration-test \ + -Pkubernetes -Pkubernetes-integration-tests \ + -pl resource-managers/kubernetes/integration-tests -am + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" + # Preserve the Minikube VM The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala index 3199a8c385f95..f402d240bfc33 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.internal.Logging /** diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index abbf7e4d5ce1b..bd5ff7a005d46 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -25,26 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} private[spark] class KubernetesSuite extends SparkFunSuite { + private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend() override def beforeAll(): Unit = { - Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + testBackend.initialize() } override def afterAll(): Unit = { - if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() - } + testBackend.cleanUp() } override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = { Vector( - new KubernetesV1Suite, - new KubernetesV2Suite) + new KubernetesV1Suite(testBackend), + new KubernetesV2Suite(testBackend)) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 53e02f9e479c1..8cdacee655c05 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -17,22 +17,27 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.util.UUID +import javax.net.ssl.X509TrustManager -import org.scalatest.concurrent.Eventually import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import io.fabric8.kubernetes.client.internal.SSLUtils +import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil -private[spark] class KubernetesTestComponents { +private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { val namespace = UUID.randomUUID().toString.replaceAll("-", "") - val kubernetesClient = Minikube.getKubernetesClient.inNamespace(namespace) + val kubernetesClient = defaultClient.inNamespace(namespace) val clientConfig = kubernetesClient.getConfiguration def createNamespace(): Unit = { - Minikube.getKubernetesClient.namespaces.createNew() + defaultClient.namespaces.createNew() .withNewMetadata() .withName(namespace) .endMetadata() @@ -40,9 +45,9 @@ private[spark] class KubernetesTestComponents { } def deleteNamespace(): Unit = { - Minikube.getKubernetesClient.namespaces.withName(namespace).delete() + defaultClient.namespaces.withName(namespace).delete() Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { - val namespaceList = Minikube.getKubernetesClient + val namespaceList = defaultClient .namespaces() .list() .getItems() @@ -53,13 +58,12 @@ private[spark] class KubernetesTestComponents { def newSparkConf(): SparkConf = { new SparkConf(true) - .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") - .set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile) - .set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) - .set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + .setMaster(s"k8s://${kubernetesClient.getMasterUrl}") .set(KUBERNETES_NAMESPACE, namespace) - .set(DRIVER_DOCKER_IMAGE, "spark-driver:latest") - .set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest") + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor:latest")) .setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath)) .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") @@ -69,4 +73,26 @@ private[spark] class KubernetesTestComponents { .set("spark.testing", "false") .set(WAIT_FOR_APP_COMPLETION, false) } -} + + def getService[T: ClassTag]( + serviceName: String, + namespace: String, + servicePortName: String, + servicePath: String = ""): T = synchronized { + val kubernetesMaster = s"${defaultClient.getMasterUrl}" + + val url = s"${ + Array[String]( + s"${kubernetesClient.getMasterUrl}", + "api", "v1", "proxy", + "namespaces", namespace, + "services", serviceName).mkString("/") + }" + + s":$servicePortName$servicePath" + val userHome = System.getProperty("user.home") + val kubernetesConf = kubernetesClient.getConfiguration + val sslContext = SSLUtils.sslContext(kubernetesConf) + val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] + HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager) + } +} \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index a4e3353032b71..4cbd074547915 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ + import com.google.common.collect.ImmutableList import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.Pod @@ -25,26 +27,28 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually -import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND} import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @DoNotDiscover -private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter { +private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) + extends SparkFunSuite with BeforeAndAfter { private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents() + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) kubernetesTestComponents.createNamespace() } @@ -85,7 +89,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter .get(0) .getMetadata .getName - Minikube.getService[SparkRestApiV1](serviceName, + kubernetesTestComponents.getService[SparkRestApiV1](serviceName, kubernetesTestComponents.namespace, "spark-ui-port") } @@ -168,6 +172,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Enable SSL on the driver submit server") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( Minikube.getMinikubeIp, "changeit", @@ -188,6 +194,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Enable SSL on the driver submit server using PEM files") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}") @@ -201,6 +209,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Added files should exist on the driver.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) sparkConf.setAppName("spark-file-existence-test") val podCompletedFuture = SettableFuture.create[Boolean] @@ -257,6 +267,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Use external URI provider") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val externalUriProviderWatch = new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services() @@ -288,6 +300,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Mount the Kubernetes credentials onto the driver pod") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, kubernetesTestComponents.clientConfig.getCaCertFile) sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 0d74067334028..8fa7cbd52ee83 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -21,14 +21,17 @@ import java.util.UUID import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions} +import org.apache.spark._ import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl} @DoNotDiscover -private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter { +private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) + extends SparkFunSuite with BeforeAndAfter { private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") private var kubernetesTestComponents: KubernetesTestComponents = _ @@ -36,7 +39,7 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) resourceStagingServerLauncher = new ResourceStagingServerLauncher( kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } @@ -54,11 +57,15 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } test("Use submission v2.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + launchStagingServer(SSLOptions()) runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = Minikube.getMinikubeIp, keyStorePassword = "keyStore", @@ -81,6 +88,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } test("Use container-local resources without the resource staging server") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) @@ -88,6 +97,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( resourceStagingServerSslOptions) val resourceStagingServerUriScheme = if (resourceStagingServerSslOptions.enabled) { @@ -96,7 +107,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter "http" } sparkConf.set(RESOURCE_STAGING_SERVER_URI, - s"$resourceStagingServerUriScheme://${Minikube.getMinikubeIp}:$resourceStagingServerPort") + s"$resourceStagingServerUriScheme://" + + s"${Minikube.getMinikubeIp}:$resourceStagingServerPort") } private def runSparkAppAndVerifyCompletion(appResource: String): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala new file mode 100644 index 0000000000000..d0bfac3085487 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +object ProcessUtils extends Logging { + /** + * executeProcess is used to run a command and return the output if it + * completes within timeout seconds. + */ + def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + + Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => + Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => + var line: String = null + do { + line = bufferedOutput.readLine() + if (line != null) { + logInfo(line) + outputLines += line + } + } while (line != null) + } + } + assert(proc.waitFor(timeout, TimeUnit.SECONDS), + s"Timed out while executing ${fullCommand.mkString(" ")}") + assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") + outputLines.toSeq + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala new file mode 100644 index 0000000000000..1ef096be4af02 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.constants.GCE_TEST_BACKEND + +private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(resolveK8sMaster(master)) + defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build) + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def name(): String = GCE_TEST_BACKEND +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala new file mode 100644 index 0000000000000..c5bc923dd51a6 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.integrationtest.backend + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE.GCETestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend} +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder + +private[spark] trait IntegrationTestBackend { + def name(): String + def initialize(): Unit + def getKubernetesClient(): DefaultKubernetesClient + def cleanUp(): Unit = {} +} + +private[spark] object IntegrationTestBackendFactory { + def getTestBackend(): IntegrationTestBackend = { + Option(System.getProperty("spark.kubernetes.test.master")) + .map(new GCETestBackend(_)) + .getOrElse(new MinikubeTestBackend()) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala similarity index 64% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala index 81491be944d3e..7c4b344e8f72b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala @@ -14,20 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.integrationtest.minikube +package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube -import java.io.{BufferedReader, InputStreamReader} import java.nio.file.Paths -import java.util.concurrent.TimeUnit -import java.util.regex.Pattern -import javax.net.ssl.X509TrustManager import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} -import io.fabric8.kubernetes.client.internal.SSLUtils -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag -import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil +import org.apache.spark.deploy.kubernetes.integrationtest.ProcessUtils import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -93,7 +86,7 @@ private[spark] object Minikube extends Logging { } def getKubernetesClient: DefaultKubernetesClient = synchronized { - val kubernetesMaster = s"https://$getMinikubeIp:8443" + val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") val kubernetesConf = new ConfigBuilder() .withApiVersion("v1") @@ -105,32 +98,6 @@ private[spark] object Minikube extends Logging { new DefaultKubernetesClient(kubernetesConf) } - def getService[T: ClassTag]( - serviceName: String, - namespace: String, - servicePortName: String, - servicePath: String = ""): T = synchronized { - val kubernetesMaster = s"https://$getMinikubeIp:8443" - val url = s"${ - Array[String]( - kubernetesMaster, - "api", "v1", "proxy", - "namespaces", namespace, - "services", serviceName).mkString("/")}" + - s":$servicePortName$servicePath" - val userHome = System.getProperty("user.home") - val kubernetesConf = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(kubernetesMaster) - .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) - .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) - .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) - .build() - val sslContext = SSLUtils.sslContext(kubernetesConf) - val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] - HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager) - } - def executeMinikubeSsh(command: String): Unit = { executeMinikube("ssh", command) } @@ -141,28 +108,8 @@ private[spark] object Minikube extends Logging { throw new IllegalStateException("Failed to make the Minikube binary executable.") } } - val fullCommand = Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args - val pb = new ProcessBuilder().command(fullCommand: _*) - pb.redirectErrorStream(true) - val proc = pb.start() - val outputLines = new ArrayBuffer[String] - - Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => - Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => - var line: String = null - do { - line = bufferedOutput.readLine() - if (line != null) { - logInfo(line) - outputLines += line - } - } while (line != null) - } - } - assert(proc.waitFor(MINIKUBE_STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS), - s"Timed out while executing $action on minikube.") - assert(proc.exitValue == 0, s"Failed to execute minikube $action ${args.mkString(" ")}") - outputLines.toSeq + ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, + MINIKUBE_STARTUP_TIMEOUT_SECONDS) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala new file mode 100644 index 0000000000000..6e0049b813719 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder + +private[spark] class MinikubeTestBackend extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def cleanUp(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + override def name(): String = MINIKUBE_TEST_BACKEND + + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala new file mode 100644 index 0000000000000..8207198b529d2 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +package object constants { + val MINIKUBE_TEST_BACKEND = "minikube" + val GCE_TEST_BACKEND = "gce" +} \ No newline at end of file