From cb24332f0fc15d8f681dcf7f914bd2d1be75c0e9 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 25 Apr 2017 10:40:58 -0700 Subject: [PATCH 1/8] Part 1: making test code cluster-agnostic --- .../ExternalUriProviderWatch.scala | 17 +++-- .../integrationtest/KubernetesClient.scala | 66 +++++++++++++++++++ .../integrationtest/KubernetesSuite.scala | 8 +-- .../KubernetesTestComponents.scala | 62 +++++++++++++---- .../integrationtest/KubernetesV1Suite.scala | 16 ++++- .../integrationtest/KubernetesV2Suite.scala | 13 +++- .../kubernetes/integrationtest/Util.scala | 52 +++++++++++++++ .../integrationtest/minikube/Minikube.scala | 53 ++------------- 8 files changed, 212 insertions(+), 75 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala index 3199a8c385f95..f6373a94ab5b8 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -32,7 +32,8 @@ import org.apache.spark.internal.Logging * for tests - essentially forces the service to revert back to being exposed * on NodePort. */ -private[spark] class ExternalUriProviderWatch(kubernetesClient: KubernetesClient) +private[spark] class ExternalUriProviderWatch( + kubernetestTestComponents: KubernetesTestComponents) extends Watcher[Service] with Logging { // Visible for testing @@ -45,7 +46,10 @@ private[spark] class ExternalUriProviderWatch(kubernetesClient: KubernetesClient .asScala .get(ANNOTATION_PROVIDE_EXTERNAL_URI).foreach { _ => if (!annotationSet.getAndSet(true)) { - val nodePortService = kubernetesClient.services().withName(service.getMetadata.getName) + val nodePortService = kubernetestTestComponents + .kubernetesClient + .services() + .withName(service.getMetadata.getName) .edit() .editSpec() .withType("NodePort") @@ -58,8 +62,13 @@ private[spark] class ExternalUriProviderWatch(kubernetesClient: KubernetesClient .find(_.getName == SUBMISSION_SERVER_PORT_NAME) .map(_.getNodePort) .getOrElse(throw new IllegalStateException("Submission server port not found.")) - val resolvedNodePortUri = s"http://${Minikube.getMinikubeIp}:$submissionServerPort" - kubernetesClient.services().withName(service.getMetadata.getName).edit() + val resolvedNodePortUri = s"http://" + + s"${Minikube.getMinikubeIp}:$submissionServerPort" + kubernetestTestComponents + .kubernetesClient + .services() + .withName(service.getMetadata.getName) + .edit() .editMetadata() .addToAnnotations(ANNOTATION_RESOLVED_EXTERNAL_URI, resolvedNodePortUri) .endMetadata() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala new file mode 100644 index 0000000000000..b5af67000859e --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.nio.file.Paths +import javax.net.ssl.X509TrustManager + +import scala.reflect.ClassTag + +import io.fabric8.kubernetes.client.internal.SSLUtils +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil + +object TestBackend extends Enumeration { + val SingleNode, MultiNode = Value +} + +object KubernetesClient { + var defaultClient: DefaultKubernetesClient = _ + var testBackend: TestBackend.Value = _ + + def getClient(): DefaultKubernetesClient = { + if (defaultClient == null) { + createClient + } + defaultClient + } + + private def createClient(): Unit = { + System.getProperty("spark.docker.test.master") match { + case null => + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + testBackend = TestBackend.SingleNode + + case _ => + val master = System.getProperty("spark.docker.test.master") + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(resolveK8sMaster(master)) + val k8ClientConfig = k8ConfBuilder.build + defaultClient = new DefaultKubernetesClient(k8ClientConfig) + testBackend = TestBackend.MultiNode + } + } +} \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index abbf7e4d5ce1b..b5c311e4e0266 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -20,21 +20,17 @@ import java.nio.file.Paths import com.google.common.base.Charsets import com.google.common.io.Files +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.scalatest.Suite import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube private[spark] class KubernetesSuite extends SparkFunSuite { - - override def beforeAll(): Unit = { - Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() - } - override def afterAll(): Unit = { if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { Minikube.deleteMinikube() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 53e02f9e479c1..e500f5fcb0166 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -17,22 +17,29 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.util.UUID +import javax.net.ssl.X509TrustManager import org.scalatest.concurrent.Eventually import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import io.fabric8.kubernetes.client.{BaseClient, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.internal.SSLUtils +import okhttp3.OkHttpClient import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil -private[spark] class KubernetesTestComponents { +private[spark] class KubernetesTestComponents (defaultClient: DefaultKubernetesClient) { val namespace = UUID.randomUUID().toString.replaceAll("-", "") - val kubernetesClient = Minikube.getKubernetesClient.inNamespace(namespace) + val kubernetesClient = defaultClient.inNamespace(namespace) val clientConfig = kubernetesClient.getConfiguration def createNamespace(): Unit = { - Minikube.getKubernetesClient.namespaces.createNew() + defaultClient.namespaces.createNew() .withNewMetadata() .withName(namespace) .endMetadata() @@ -40,9 +47,9 @@ private[spark] class KubernetesTestComponents { } def deleteNamespace(): Unit = { - Minikube.getKubernetesClient.namespaces.withName(namespace).delete() + defaultClient.namespaces.withName(namespace).delete() Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { - val namespaceList = Minikube.getKubernetesClient + val namespaceList = defaultClient .namespaces() .list() .getItems() @@ -53,13 +60,12 @@ private[spark] class KubernetesTestComponents { def newSparkConf(): SparkConf = { new SparkConf(true) - .setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443") - .set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile) - .set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile) - .set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile) + .setMaster(s"k8s://${kubernetesClient.getMasterUrl}") .set(KUBERNETES_NAMESPACE, namespace) - .set(DRIVER_DOCKER_IMAGE, "spark-driver:latest") - .set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest") + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor:latest")) .setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath)) .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") @@ -69,4 +75,38 @@ private[spark] class KubernetesTestComponents { .set("spark.testing", "false") .set(WAIT_FOR_APP_COMPLETION, false) } + + + def getHttpClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + + def getService[T: ClassTag]( + serviceName: String, + namespace: String, + servicePortName: String, + servicePath: String = ""): T = synchronized { + val kubernetesMaster = s"${defaultClient.getMasterUrl}" + val httpClient = getHttpClient(kubernetesClient.asInstanceOf[BaseClient]) + + val url = s"${ + Array[String]( + s"${kubernetesClient.getMasterUrl}", + "api", "v1", "proxy", + "namespaces", namespace, + "services", serviceName).mkString("/")}" + + s":$servicePortName$servicePath" + val userHome = System.getProperty("user.home") + val kubernetesConf = kubernetesClient.getConfiguration + val sslContext = SSLUtils.sslContext(kubernetesConf) + val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] + HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager) + } + } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index a4e3353032b71..1d97806efa01c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -44,7 +44,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents() + kubernetesTestComponents = new KubernetesTestComponents(KubernetesClient.getClient()) kubernetesTestComponents.createNamespace() } @@ -85,7 +85,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter .get(0) .getMetadata .getName - Minikube.getService[SparkRestApiV1](serviceName, + kubernetesTestComponents.getService[SparkRestApiV1](serviceName, kubernetesTestComponents.namespace, "spark-ui-port") } @@ -168,6 +168,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Enable SSL on the driver submit server") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( Minikube.getMinikubeIp, "changeit", @@ -188,6 +190,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Enable SSL on the driver submit server using PEM files") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}") @@ -201,6 +205,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Added files should exist on the driver.") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) sparkConf.setAppName("spark-file-existence-test") val podCompletedFuture = SettableFuture.create[Boolean] @@ -257,8 +263,10 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Use external URI provider") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + val externalUriProviderWatch = - new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) + new ExternalUriProviderWatch(kubernetesTestComponents) Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services() .withLabel("spark-app-name", "spark-pi") .watch(externalUriProviderWatch)) { _ => @@ -288,6 +296,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Mount the Kubernetes credentials onto the driver pod") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, kubernetesTestComponents.clientConfig.getCaCertFile) sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 0d74067334028..6fb9c9843e940 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -36,7 +36,7 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents + kubernetesTestComponents = new KubernetesTestComponents(KubernetesClient.getClient()) resourceStagingServerLauncher = new ResourceStagingServerLauncher( kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } @@ -54,11 +54,15 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } test("Use submission v2.") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + launchStagingServer(SSLOptions()) runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = Minikube.getMinikubeIp, keyStorePassword = "keyStore", @@ -81,6 +85,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } test("Use container-local resources without the resource staging server") { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) @@ -88,6 +94,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { + assume(KubernetesClient.testBackend == TestBackend.SingleNode) + val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( resourceStagingServerSslOptions) val resourceStagingServerUriScheme = if (resourceStagingServerSslOptions.enabled) { @@ -96,7 +104,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter "http" } sparkConf.set(RESOURCE_STAGING_SERVER_URI, - s"$resourceStagingServerUriScheme://${Minikube.getMinikubeIp}:$resourceStagingServerPort") + s"$resourceStagingServerUriScheme://" + + s"${Minikube.getMinikubeIp}:$resourceStagingServerPort") } private def runSparkAppAndVerifyCompletion(appResource: String): Unit = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala new file mode 100644 index 0000000000000..115f16bdb63b1 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube.logInfo +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +object Util extends Logging { + def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + + Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => + Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => + var line: String = null + do { + line = bufferedOutput.readLine() + if (line != null) { + logInfo(line) + outputLines += line + } + } while (line != null) + } + } + assert(proc.waitFor(timeout, TimeUnit.SECONDS), + s"Timed out while executing ${fullCommand.mkString(" ")}") + assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") + outputLines.toSeq + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala index 81491be944d3e..3eea2cc57b26e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.client.internal.SSLUtils import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag +import org.apache.spark.deploy.kubernetes.integrationtest.Util import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -93,7 +94,7 @@ private[spark] object Minikube extends Logging { } def getKubernetesClient: DefaultKubernetesClient = synchronized { - val kubernetesMaster = s"https://$getMinikubeIp:8443" + val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") val kubernetesConf = new ConfigBuilder() .withApiVersion("v1") @@ -105,32 +106,6 @@ private[spark] object Minikube extends Logging { new DefaultKubernetesClient(kubernetesConf) } - def getService[T: ClassTag]( - serviceName: String, - namespace: String, - servicePortName: String, - servicePath: String = ""): T = synchronized { - val kubernetesMaster = s"https://$getMinikubeIp:8443" - val url = s"${ - Array[String]( - kubernetesMaster, - "api", "v1", "proxy", - "namespaces", namespace, - "services", serviceName).mkString("/")}" + - s":$servicePortName$servicePath" - val userHome = System.getProperty("user.home") - val kubernetesConf = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(kubernetesMaster) - .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) - .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) - .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) - .build() - val sslContext = SSLUtils.sslContext(kubernetesConf) - val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] - HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager) - } - def executeMinikubeSsh(command: String): Unit = { executeMinikube("ssh", command) } @@ -141,28 +116,8 @@ private[spark] object Minikube extends Logging { throw new IllegalStateException("Failed to make the Minikube binary executable.") } } - val fullCommand = Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args - val pb = new ProcessBuilder().command(fullCommand: _*) - pb.redirectErrorStream(true) - val proc = pb.start() - val outputLines = new ArrayBuffer[String] - - Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => - Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => - var line: String = null - do { - line = bufferedOutput.readLine() - if (line != null) { - logInfo(line) - outputLines += line - } - } while (line != null) - } - } - assert(proc.waitFor(MINIKUBE_STARTUP_TIMEOUT_SECONDS, TimeUnit.SECONDS), - s"Timed out while executing $action on minikube.") - assert(proc.exitValue == 0, s"Failed to execute minikube $action ${args.mkString(" ")}") - outputLines.toSeq + Util.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, + MINIKUBE_STARTUP_TIMEOUT_SECONDS) } } From 3f6bc24c78c2df4ee2bfe7465eea3b4fdcb85f54 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 25 Apr 2017 14:59:18 -0700 Subject: [PATCH 2/8] Final checked --- resource-managers/kubernetes/README.md | 8 +++++ .../ExternalUriProviderWatch.scala | 17 +++-------- .../integrationtest/KubernetesClient.scala | 18 ++++++------ .../integrationtest/KubernetesSuite.scala | 11 ++++--- .../KubernetesTestComponents.scala | 29 +++++++++---------- .../integrationtest/KubernetesV1Suite.scala | 2 +- 6 files changed, 41 insertions(+), 44 deletions(-) diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index d70c38fdc64d5..a76142c913e2e 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -61,6 +61,14 @@ build/mvn integration-test \ -pl resource-managers/kubernetes/integration-tests -am ``` +# Running against an arbitrary cluster + +In order to run against any cluster, use the following: +build/mvn integration-test \ + -Pkubernetes -Pkubernetes-integration-tests \ + -pl resource-managers/kubernetes/integration-tests -am + -DextraScalaTestArgs="-Dspark.docker.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" + # Preserve the Minikube VM The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala index f6373a94ab5b8..3199a8c385f95 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -32,8 +32,7 @@ import org.apache.spark.internal.Logging * for tests - essentially forces the service to revert back to being exposed * on NodePort. */ -private[spark] class ExternalUriProviderWatch( - kubernetestTestComponents: KubernetesTestComponents) +private[spark] class ExternalUriProviderWatch(kubernetesClient: KubernetesClient) extends Watcher[Service] with Logging { // Visible for testing @@ -46,10 +45,7 @@ private[spark] class ExternalUriProviderWatch( .asScala .get(ANNOTATION_PROVIDE_EXTERNAL_URI).foreach { _ => if (!annotationSet.getAndSet(true)) { - val nodePortService = kubernetestTestComponents - .kubernetesClient - .services() - .withName(service.getMetadata.getName) + val nodePortService = kubernetesClient.services().withName(service.getMetadata.getName) .edit() .editSpec() .withType("NodePort") @@ -62,13 +58,8 @@ private[spark] class ExternalUriProviderWatch( .find(_.getName == SUBMISSION_SERVER_PORT_NAME) .map(_.getNodePort) .getOrElse(throw new IllegalStateException("Submission server port not found.")) - val resolvedNodePortUri = s"http://" + - s"${Minikube.getMinikubeIp}:$submissionServerPort" - kubernetestTestComponents - .kubernetesClient - .services() - .withName(service.getMetadata.getName) - .edit() + val resolvedNodePortUri = s"http://${Minikube.getMinikubeIp}:$submissionServerPort" + kubernetesClient.services().withName(service.getMetadata.getName).edit() .editMetadata() .addToAnnotations(ANNOTATION_RESOLVED_EXTERNAL_URI, resolvedNodePortUri) .endMetadata() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala index b5af67000859e..f1c68763cad2c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala @@ -17,18 +17,11 @@ package org.apache.spark.deploy.kubernetes.integrationtest -import java.nio.file.Paths -import javax.net.ssl.X509TrustManager - -import scala.reflect.ClassTag - -import io.fabric8.kubernetes.client.internal.SSLUtils import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube -import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil object TestBackend extends Enumeration { val SingleNode, MultiNode = Value @@ -40,12 +33,19 @@ object KubernetesClient { def getClient(): DefaultKubernetesClient = { if (defaultClient == null) { - createClient + createDefaultClient } defaultClient } - private def createClient(): Unit = { + def cleanUp(): Unit = { + if (testBackend == TestBackend.SingleNode + && !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + private def createDefaultClient(): Unit = { System.getProperty("spark.docker.test.master") match { case null => Minikube.startMinikube() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index b5c311e4e0266..6b421d4d0607f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -20,21 +20,20 @@ import java.nio.file.Paths import com.google.common.base.Charsets import com.google.common.io.Files -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.scalatest.Suite import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster -import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube private[spark] class KubernetesSuite extends SparkFunSuite { + override def beforeAll(): Unit = { + KubernetesClient.getClient() + } + override def afterAll(): Unit = { - if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() - } + KubernetesClient.cleanUp() } override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index e500f5fcb0166..4527829f401bf 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -19,20 +19,19 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.util.UUID import javax.net.ssl.X509TrustManager -import org.scalatest.concurrent.Eventually import scala.collection.JavaConverters._ import scala.reflect.ClassTag import io.fabric8.kubernetes.client.{BaseClient, DefaultKubernetesClient} import io.fabric8.kubernetes.client.internal.SSLUtils import okhttp3.OkHttpClient +import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil -private[spark] class KubernetesTestComponents (defaultClient: DefaultKubernetesClient) { +private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { val namespace = UUID.randomUUID().toString.replaceAll("-", "") val kubernetesClient = defaultClient.inNamespace(namespace) @@ -76,17 +75,6 @@ private[spark] class KubernetesTestComponents (defaultClient: DefaultKubernetesC .set(WAIT_FOR_APP_COMPLETION, false) } - - def getHttpClient(client: BaseClient): OkHttpClient = { - val field = classOf[BaseClient].getDeclaredField("httpClient") - try { - field.setAccessible(true) - field.get(client).asInstanceOf[OkHttpClient] - } finally { - field.setAccessible(false) - } - } - def getService[T: ClassTag]( serviceName: String, namespace: String, @@ -100,7 +88,8 @@ private[spark] class KubernetesTestComponents (defaultClient: DefaultKubernetesC s"${kubernetesClient.getMasterUrl}", "api", "v1", "proxy", "namespaces", namespace, - "services", serviceName).mkString("/")}" + + "services", serviceName).mkString("/") + }" + s":$servicePortName$servicePath" val userHome = System.getProperty("user.home") val kubernetesConf = kubernetesClient.getConfiguration @@ -109,4 +98,14 @@ private[spark] class KubernetesTestComponents (defaultClient: DefaultKubernetesC HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager) } + def getHttpClient(client: BaseClient): OkHttpClient = { + val field = classOf[BaseClient].getDeclaredField("httpClient") + try { + field.setAccessible(true) + field.get(client).asInstanceOf[OkHttpClient] + } finally { + field.setAccessible(false) + } + } + } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index 1d97806efa01c..ddb4a28f0dd52 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -266,7 +266,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter assume(KubernetesClient.testBackend == TestBackend.SingleNode) val externalUriProviderWatch = - new ExternalUriProviderWatch(kubernetesTestComponents) + new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services() .withLabel("spark-app-name", "spark-pi") .watch(externalUriProviderWatch)) { _ => From 1f630146b550b0fa4ae44f9669b0f00e4e0b13c8 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 25 Apr 2017 15:18:13 -0700 Subject: [PATCH 3/8] Move all test code into KubernetesTestComponents --- .../integrationtest/KubernetesClient.scala | 66 ------------------- .../KubernetesTestComponents.scala | 45 ++++++++++++- 2 files changed, 44 insertions(+), 67 deletions(-) delete mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala deleted file mode 100644 index f1c68763cad2c..0000000000000 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesClient.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.kubernetes.integrationtest - -import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} - -import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster -import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube - -object TestBackend extends Enumeration { - val SingleNode, MultiNode = Value -} - -object KubernetesClient { - var defaultClient: DefaultKubernetesClient = _ - var testBackend: TestBackend.Value = _ - - def getClient(): DefaultKubernetesClient = { - if (defaultClient == null) { - createDefaultClient - } - defaultClient - } - - def cleanUp(): Unit = { - if (testBackend == TestBackend.SingleNode - && !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() - } - } - - private def createDefaultClient(): Unit = { - System.getProperty("spark.docker.test.master") match { - case null => - Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() - defaultClient = Minikube.getKubernetesClient - testBackend = TestBackend.SingleNode - - case _ => - val master = System.getProperty("spark.docker.test.master") - var k8ConfBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(resolveK8sMaster(master)) - val k8ClientConfig = k8ConfBuilder.build - defaultClient = new DefaultKubernetesClient(k8ClientConfig) - testBackend = TestBackend.MultiNode - } - } -} \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 4527829f401bf..6cdb357e2dd5d 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -22,13 +22,15 @@ import javax.net.ssl.X509TrustManager import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import io.fabric8.kubernetes.client.{BaseClient, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.{BaseClient, ConfigBuilder, DefaultKubernetesClient} import io.fabric8.kubernetes.client.internal.SSLUtils import okhttp3.OkHttpClient import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { @@ -107,5 +109,46 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl field.setAccessible(false) } } +} + +object TestBackend extends Enumeration { + val SingleNode, MultiNode = Value +} + +object KubernetesClient { + var defaultClient: DefaultKubernetesClient = _ + var testBackend: TestBackend.Value = _ + + def getClient(): DefaultKubernetesClient = { + if (defaultClient == null) { + createDefaultClient + } + defaultClient + } + private def createDefaultClient(): Unit = { + System.getProperty("spark.docker.test.master") match { + case null => + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + testBackend = TestBackend.SingleNode + + case _ => + val master = System.getProperty("spark.docker.test.master") + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(resolveK8sMaster(master)) + val k8ClientConfig = k8ConfBuilder.build + defaultClient = new DefaultKubernetesClient(k8ClientConfig) + testBackend = TestBackend.MultiNode + } + } + + def cleanUp(): Unit = { + if (testBackend == TestBackend.SingleNode + && !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } } From ed1cc2780fe6d318e4914e489ad734d64818b090 Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 25 Apr 2017 16:51:50 -0700 Subject: [PATCH 4/8] Addressed comments --- resource-managers/kubernetes/README.md | 2 +- .../integrationtest/KubernetesSuite.scala | 11 ++-- .../KubernetesTestComponents.scala | 51 +++++++++---------- .../integrationtest/KubernetesV1Suite.scala | 18 ++++--- .../integrationtest/KubernetesV2Suite.scala | 15 +++--- .../{Util.scala => ProcessUtils.scala} | 7 ++- .../integrationtest/minikube/Minikube.scala | 12 +---- 7 files changed, 57 insertions(+), 59 deletions(-) rename resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/{Util.scala => ProcessUtils.scala} (92%) diff --git a/resource-managers/kubernetes/README.md b/resource-managers/kubernetes/README.md index a76142c913e2e..fd1ad29eb795d 100644 --- a/resource-managers/kubernetes/README.md +++ b/resource-managers/kubernetes/README.md @@ -67,7 +67,7 @@ In order to run against any cluster, use the following: build/mvn integration-test \ -Pkubernetes -Pkubernetes-integration-tests \ -pl resource-managers/kubernetes/integration-tests -am - -DextraScalaTestArgs="-Dspark.docker.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" # Preserve the Minikube VM diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6b421d4d0607f..e90c190b60224 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -25,21 +25,22 @@ import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube private[spark] class KubernetesSuite extends SparkFunSuite { + private var kubernetesTestClient: KubernetesTestClient = _ + override def beforeAll(): Unit = { - KubernetesClient.getClient() + kubernetesTestClient = new KubernetesTestClient() } override def afterAll(): Unit = { - KubernetesClient.cleanUp() + kubernetesTestClient.cleanUp() } override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = { Vector( - new KubernetesV1Suite, - new KubernetesV2Suite) + new KubernetesV1Suite(kubernetesTestClient), + new KubernetesV2Suite(kubernetesTestClient)) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 6cdb357e2dd5d..065f8033dd913 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -111,42 +111,41 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl } } -object TestBackend extends Enumeration { +object KubernetesTestBackend extends Enumeration { val SingleNode, MultiNode = Value } -object KubernetesClient { +/** + * Creates and holds a Kubernetes client for executing tests. + * When master and driver/executor images are supplied, we return a client + * for that cluster. By default, we return a Minikube client + */ + +class KubernetesTestClient { var defaultClient: DefaultKubernetesClient = _ - var testBackend: TestBackend.Value = _ + var testBackend: KubernetesTestBackend.Value = _ + + Option(System.getProperty("spark.kubernetes.test.master")).map { + master => + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(resolveK8sMaster(master)) + val k8ClientConfig = k8ConfBuilder.build + defaultClient = new DefaultKubernetesClient(k8ClientConfig) + testBackend = KubernetesTestBackend.MultiNode + }.getOrElse { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + testBackend = KubernetesTestBackend.SingleNode + } def getClient(): DefaultKubernetesClient = { - if (defaultClient == null) { - createDefaultClient - } defaultClient } - private def createDefaultClient(): Unit = { - System.getProperty("spark.docker.test.master") match { - case null => - Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() - defaultClient = Minikube.getKubernetesClient - testBackend = TestBackend.SingleNode - - case _ => - val master = System.getProperty("spark.docker.test.master") - var k8ConfBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(resolveK8sMaster(master)) - val k8ClientConfig = k8ConfBuilder.build - defaultClient = new DefaultKubernetesClient(k8ClientConfig) - testBackend = TestBackend.MultiNode - } - } - def cleanUp(): Unit = { - if (testBackend == TestBackend.SingleNode + if (testBackend == KubernetesTestBackend.SingleNode && !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { Minikube.deleteMinikube() } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index ddb4a28f0dd52..63031e211b33f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy.kubernetes.integrationtest import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ + import com.google.common.collect.ImmutableList import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.Pod @@ -25,7 +27,6 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually -import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SSLUtils @@ -38,13 +39,14 @@ import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @DoNotDiscover -private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter { +private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClient) + extends SparkFunSuite with BeforeAndAfter { private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents(KubernetesClient.getClient()) + kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient()) kubernetesTestComponents.createNamespace() } @@ -168,7 +170,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Enable SSL on the driver submit server") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( Minikube.getMinikubeIp, @@ -190,7 +192,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Enable SSL on the driver submit server using PEM files") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") @@ -205,7 +207,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Added files should exist on the driver.") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) sparkConf.setAppName("spark-file-existence-test") @@ -263,7 +265,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Use external URI provider") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) val externalUriProviderWatch = new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) @@ -296,7 +298,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter } test("Mount the Kubernetes credentials onto the driver pod") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, kubernetesTestComponents.clientConfig.getCaCertFile) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 6fb9c9843e940..4fca10bf30138 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -21,14 +21,15 @@ import java.util.UUID import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually -import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions} +import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl} @DoNotDiscover -private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter { +private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClient) + extends SparkFunSuite with BeforeAndAfter { private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") private var kubernetesTestComponents: KubernetesTestComponents = _ @@ -36,7 +37,7 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents(KubernetesClient.getClient()) + kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient()) resourceStagingServerLauncher = new ResourceStagingServerLauncher( kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } @@ -54,14 +55,14 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } test("Use submission v2.") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) launchStagingServer(SSLOptions()) runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = Minikube.getMinikubeIp, @@ -85,7 +86,7 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } test("Use container-local resources without the resource staging server") { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, @@ -94,7 +95,7 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter } private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { - assume(KubernetesClient.testBackend == TestBackend.SingleNode) + assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( resourceStagingServerSslOptions) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala similarity index 92% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala index 115f16bdb63b1..aad75ea42eda9 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/Util.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala @@ -21,11 +21,14 @@ import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuffer -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube.logInfo import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -object Util extends Logging { +/** + * ProcessUtils is used to run a command and return the output if it + * completes within timeout seconds. + */ +object ProcessUtils extends Logging { def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { val pb = new ProcessBuilder().command(fullCommand: _*) pb.redirectErrorStream(true) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala index 3eea2cc57b26e..69b3ff2784796 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala @@ -16,19 +16,11 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest.minikube -import java.io.{BufferedReader, InputStreamReader} import java.nio.file.Paths -import java.util.concurrent.TimeUnit -import java.util.regex.Pattern -import javax.net.ssl.X509TrustManager import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} -import io.fabric8.kubernetes.client.internal.SSLUtils -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag -import org.apache.spark.deploy.kubernetes.integrationtest.Util -import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil +import org.apache.spark.deploy.kubernetes.integrationtest.ProcessUtils import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -116,7 +108,7 @@ private[spark] object Minikube extends Logging { throw new IllegalStateException("Failed to make the Minikube binary executable.") } } - Util.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, + ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS) } } From b7ac04e08fa0136f657fe953c9cb60460808539b Mon Sep 17 00:00:00 2001 From: foxish Date: Tue, 25 Apr 2017 17:07:15 -0700 Subject: [PATCH 5/8] Fixed doc --- .../deploy/kubernetes/integrationtest/ProcessUtils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala index aad75ea42eda9..d0bfac3085487 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ProcessUtils.scala @@ -24,11 +24,11 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -/** - * ProcessUtils is used to run a command and return the output if it - * completes within timeout seconds. - */ object ProcessUtils extends Logging { + /** + * executeProcess is used to run a command and return the output if it + * completes within timeout seconds. + */ def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { val pb = new ProcessBuilder().command(fullCommand: _*) pb.redirectErrorStream(true) From d4d2e9c3d34922c48804c24fe008eabbe44c6d94 Mon Sep 17 00:00:00 2001 From: Anirudh Ramanathan Date: Fri, 28 Apr 2017 19:33:41 -0700 Subject: [PATCH 6/8] Restructure the test backends (#248) * Restructured the test backends * Address comments * var -> val --- .../ExternalUriProviderWatch.scala | 2 +- .../integrationtest/KubernetesSuite.scala | 11 +++-- .../KubernetesTestComponents.scala | 45 +----------------- .../integrationtest/KubernetesV1Suite.scala | 18 +++---- .../integrationtest/KubernetesV2Suite.scala | 18 +++---- .../backend/GCE/GCETestBackend.scala | 40 ++++++++++++++++ .../backend/IntegrationTestBackend.scala | 42 +++++++++++++++++ .../{ => backend}/minikube/Minikube.scala | 2 +- .../minikube/MinikubeTestBackend.scala | 47 +++++++++++++++++++ .../integrationtest/constants.scala | 22 +++++++++ 10 files changed, 181 insertions(+), 66 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala rename resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/{ => backend}/minikube/Minikube.scala (98%) create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala create mode 100644 resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala index 3199a8c385f95..f402d240bfc33 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/ExternalUriProviderWatch.scala @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.internal.Logging /** diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index e90c190b60224..bd5ff7a005d46 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -25,22 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} private[spark] class KubernetesSuite extends SparkFunSuite { - private var kubernetesTestClient: KubernetesTestClient = _ + private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend() override def beforeAll(): Unit = { - kubernetesTestClient = new KubernetesTestClient() + testBackend.initialize() } override def afterAll(): Unit = { - kubernetesTestClient.cleanUp() + testBackend.cleanUp() } override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = { Vector( - new KubernetesV1Suite(kubernetesTestClient), - new KubernetesV2Suite(kubernetesTestClient)) + new KubernetesV1Suite(testBackend), + new KubernetesV2Suite(testBackend)) } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index 065f8033dd913..d536000f84c5a 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -30,7 +30,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { @@ -109,45 +109,4 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl field.setAccessible(false) } } -} - -object KubernetesTestBackend extends Enumeration { - val SingleNode, MultiNode = Value -} - -/** - * Creates and holds a Kubernetes client for executing tests. - * When master and driver/executor images are supplied, we return a client - * for that cluster. By default, we return a Minikube client - */ - -class KubernetesTestClient { - var defaultClient: DefaultKubernetesClient = _ - var testBackend: KubernetesTestBackend.Value = _ - - Option(System.getProperty("spark.kubernetes.test.master")).map { - master => - var k8ConfBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(resolveK8sMaster(master)) - val k8ClientConfig = k8ConfBuilder.build - defaultClient = new DefaultKubernetesClient(k8ClientConfig) - testBackend = KubernetesTestBackend.MultiNode - }.getOrElse { - Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() - defaultClient = Minikube.getKubernetesClient - testBackend = KubernetesTestBackend.SingleNode - } - - def getClient(): DefaultKubernetesClient = { - defaultClient - } - - def cleanUp(): Unit = { - if (testBackend == KubernetesTestBackend.SingleNode - && !System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() - } - } -} +} \ No newline at end of file diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala index 63031e211b33f..4cbd074547915 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV1Suite.scala @@ -32,21 +32,23 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND} import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1 import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager} import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus} import org.apache.spark.util.Utils @DoNotDiscover -private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClient) +private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend) extends SparkFunSuite with BeforeAndAfter { private var kubernetesTestComponents: KubernetesTestComponents = _ private var sparkConf: SparkConf = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient()) + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) kubernetesTestComponents.createNamespace() } @@ -170,7 +172,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Enable SSL on the driver submit server") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair( Minikube.getMinikubeIp, @@ -192,7 +194,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Enable SSL on the driver submit server using PEM files") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp) sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}") @@ -207,7 +209,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Added files should exist on the driver.") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath) sparkConf.setAppName("spark-file-existence-test") @@ -265,7 +267,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Use external URI provider") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val externalUriProviderWatch = new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient) @@ -298,7 +300,7 @@ private[spark] class KubernetesV1Suite(kubernetesTestClient: KubernetesTestClien } test("Mount the Kubernetes credentials onto the driver pod") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, kubernetesTestComponents.clientConfig.getCaCertFile) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 4fca10bf30138..8fa7cbd52ee83 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -21,14 +21,16 @@ import java.util.UUID import org.scalatest.{BeforeAndAfter, DoNotDiscover} import org.scalatest.concurrent.Eventually -import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.deploy.kubernetes.SSLUtils import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl} @DoNotDiscover -private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClient) +private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) extends SparkFunSuite with BeforeAndAfter { private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") @@ -37,7 +39,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _ override def beforeAll(): Unit = { - kubernetesTestComponents = new KubernetesTestComponents(kubernetesTestClient.getClient()) + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) resourceStagingServerLauncher = new ResourceStagingServerLauncher( kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } @@ -55,14 +57,14 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien } test("Use submission v2.") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) launchStagingServer(SSLOptions()) runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE) } test("Enable SSL on the submission server") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair( ipAddress = Minikube.getMinikubeIp, @@ -86,7 +88,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien } test("Use container-local resources without the resource staging server") { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.setJars(Seq( KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, @@ -95,7 +97,7 @@ private[spark] class KubernetesV2Suite(kubernetesTestClient: KubernetesTestClien } private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { - assume(kubernetesTestClient.testBackend == KubernetesTestBackend.SingleNode) + assume(testBackend.name == MINIKUBE_TEST_BACKEND) val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer( resourceStagingServerSslOptions) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala new file mode 100644 index 0000000000000..1ef096be4af02 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/GCE/GCETestBackend.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.kubernetes.config.resolveK8sMaster +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.constants.GCE_TEST_BACKEND + +private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(resolveK8sMaster(master)) + defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build) + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def name(): String = GCE_TEST_BACKEND +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala new file mode 100644 index 0000000000000..44030adb47207 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.kubernetes.integrationtest.backend + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.kubernetes.integrationtest.backend.GCE.GCETestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend} +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder + +private[spark] trait IntegrationTestBackend { + def name(): String + def initialize(): Unit + def getKubernetesClient(): DefaultKubernetesClient + def cleanUp(): Unit = {} +} + +private[spark] object IntegrationTestBackendFactory { + def getTestBackend(): IntegrationTestBackend = { + Option(System.getProperty("spark.kubernetes.test.master")).map { + master => + new GCETestBackend(master) + }.getOrElse { + new MinikubeTestBackend() + } + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala similarity index 98% rename from resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala rename to resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala index 69b3ff2784796..7c4b344e8f72b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/Minikube.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.integrationtest.minikube +package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube import java.nio.file.Paths diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala new file mode 100644 index 0000000000000..6e0049b813719 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder + +private[spark] class MinikubeTestBackend extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def cleanUp(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + override def name(): String = MINIKUBE_TEST_BACKEND + + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala new file mode 100644 index 0000000000000..8207198b529d2 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/constants.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest + +package object constants { + val MINIKUBE_TEST_BACKEND = "minikube" + val GCE_TEST_BACKEND = "gce" +} \ No newline at end of file From de0df159ae6dbeffb2b8665b8612f807a1696741 Mon Sep 17 00:00:00 2001 From: foxish Date: Mon, 1 May 2017 11:05:29 -0700 Subject: [PATCH 7/8] Comments --- .../integrationtest/KubernetesTestComponents.scala | 1 - .../integrationtest/backend/IntegrationTestBackend.scala | 9 +++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index d536000f84c5a..b5a917f5ed41e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -83,7 +83,6 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl servicePortName: String, servicePath: String = ""): T = synchronized { val kubernetesMaster = s"${defaultClient.getMasterUrl}" - val httpClient = getHttpClient(kubernetesClient.asInstanceOf[BaseClient]) val url = s"${ Array[String]( diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala index 44030adb47207..c5bc923dd51a6 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/IntegrationTestBackend.scala @@ -32,11 +32,8 @@ private[spark] trait IntegrationTestBackend { private[spark] object IntegrationTestBackendFactory { def getTestBackend(): IntegrationTestBackend = { - Option(System.getProperty("spark.kubernetes.test.master")).map { - master => - new GCETestBackend(master) - }.getOrElse { - new MinikubeTestBackend() - } + Option(System.getProperty("spark.kubernetes.test.master")) + .map(new GCETestBackend(_)) + .getOrElse(new MinikubeTestBackend()) } } From d83f8d165801ce809e46cb4a246310a90851856b Mon Sep 17 00:00:00 2001 From: foxish Date: Mon, 1 May 2017 15:47:17 -0700 Subject: [PATCH 8/8] removed deadcode --- .../KubernetesTestComponents.scala | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala index b5a917f5ed41e..8cdacee655c05 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesTestComponents.scala @@ -22,15 +22,12 @@ import javax.net.ssl.X509TrustManager import scala.collection.JavaConverters._ import scala.reflect.ClassTag -import io.fabric8.kubernetes.client.{BaseClient, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.DefaultKubernetesClient import io.fabric8.kubernetes.client.internal.SSLUtils -import okhttp3.OkHttpClient import org.scalatest.concurrent.Eventually import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder -import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { @@ -98,14 +95,4 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager] HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager) } - - def getHttpClient(client: BaseClient): OkHttpClient = { - val field = classOf[BaseClient].getDeclaredField("httpClient") - try { - field.setAccessible(true) - field.get(client).asInstanceOf[OkHttpClient] - } finally { - field.setAccessible(false) - } - } } \ No newline at end of file