From e82d4c539da331c62afc3f35ad91ff214e193a19 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 15:16:04 -0800 Subject: [PATCH 01/22] Made test code compile --- integration-test/pom.xml | 369 ++++++++++++++++++ .../integration-test-asset-server/Dockerfile | 21 + .../src/test/resources/log4j.properties | 31 ++ .../k8s/integrationtest/KubernetesSuite.scala | 225 +++++++++++ .../KubernetesTestComponents.scala | 68 ++++ .../deploy/k8s/integrationtest/Logging.scala | 35 ++ .../k8s/integrationtest/ProcessUtils.scala | 52 +++ .../SparkReadinessWatcher.scala | 41 ++ .../StaticAssetServerLauncher.scala | 62 +++ .../deploy/k8s/integrationtest/TestApp.scala | 42 ++ .../deploy/k8s/integrationtest/Utils.scala | 27 ++ .../backend/GCE/GCETestBackend.scala | 39 ++ .../backend/IntegrationTestBackend.scala | 39 ++ .../backend/minikube/Minikube.scala | 131 +++++++ .../minikube/MinikubeTestBackend.scala | 45 +++ .../k8s/integrationtest/constants.scala | 22 ++ .../docker/SparkDockerImageBuilder.scala | 94 +++++ integration-test/test-data/input.txt | 1 + 18 files changed, 1344 insertions(+) create mode 100644 integration-test/pom.xml create mode 100644 integration-test/src/main/docker/integration-test-asset-server/Dockerfile create mode 100644 integration-test/src/test/resources/log4j.properties create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala create mode 100644 integration-test/test-data/input.txt diff --git a/integration-test/pom.xml b/integration-test/pom.xml new file mode 100644 index 0000000..479ce6b --- /dev/null +++ b/integration-test/pom.xml @@ -0,0 +1,369 @@ + + + + 4.0.0 + + spark-kubernetes-integration-tests_2.11 + spark-kubernetes-integration-tests + 0.1-SNAPSHOT + + 3.5 + 2.6.5 + 1.3.9 + 3.0.0 + 1.2.17 + 2.11.8 + 2.11 + kubernetes-integration-tests + + jar + Spark Project Kubernetes Integration Tests + + + + commons-logging + commons-logging + 1.1.1 + + + com.fasterxml.jackson.core + jackson-annotations + ${fasterxml.jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${fasterxml.jackson.version} + + + com.fasterxml.jackson.core + jackson-databind + ${fasterxml.jackson.version} + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + ${fasterxml.jackson.version} + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + + + com.google.guava + guava + test + + 18.0 + + + com.spotify + docker-client + 5.0.2 + test + + + + com.fasterxml.jackson.jaxrs + jackson-jaxrs-json-provider + + + com.fasterxml.jackson.core + jackson-databind + + + org.glassfish.jersey.core + jersey-client + + + org.glassfish.jersey.core + jersey-common + + + javax.ws.rs + jsr311-api + + + + + io.fabric8 + kubernetes-client + ${kubernetes.client.version} + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + + log4j + log4j + ${log4j.version} + + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + + + org.scala-lang + scala-library + ${scala.version} + + + org.scalatest + scalatest_${scala.binary.version} + 2.2.6 + test + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.2 + + + + compile + testCompile + + + + + + maven-resources-plugin + 3.0.2 + + + copy-integration-test-http-server-dockerfile + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/dockerfiles + + + src/main/docker + true + + + + + + copy-integration-python + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/python + + + ${project.parent.basedir}/python + + ${project.parent.basedir}/python/.egg + ${project.parent.basedir}/python/dist + + + + + + + copy-integration-r + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/R + + + ${project.parent.basedir}/R + + + + + + copy-integration-data + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/data + + + ${project.parent.basedir}/data + true + + + + + + copy-integration-licenses + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/licenses + + + ${project.parent.basedir}/licenses + true + + + + + + copy-integration-examples-jar + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/examples/jars + + + ${project.parent.basedir}/examples/target/scala-2.11/jars + true + + + + + + copy-integration-examples-src + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/examples/src/main + + + ${project.parent.basedir}/examples/src/main + true + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + create-release-file + pre-integration-test + + run + + + + + + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + 1.3.0 + + + download-minikube-linux + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64 + ${project.build.directory}/minikube-bin/linux-amd64 + minikube + + + + download-minikube-darwin + pre-integration-test + + wget + + + https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64 + ${project.build.directory}/minikube-bin/darwin-amd64 + minikube + + + + + + + org.scalatest + scalatest-maven-plugin + + + test + + test + + + + (?<!Suite) + + + + integration-test + integration-test + + test + + + + + + + + + diff --git a/integration-test/src/main/docker/integration-test-asset-server/Dockerfile b/integration-test/src/main/docker/integration-test-asset-server/Dockerfile new file mode 100644 index 0000000..e26d207 --- /dev/null +++ b/integration-test/src/main/docker/integration-test-asset-server/Dockerfile @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Simple asset server that can provide the integration test jars over HTTP. +FROM trinitronx/python-simplehttpserver:travis-12 + +ADD examples/integration-tests-jars /var/www diff --git a/integration-test/src/test/resources/log4j.properties b/integration-test/src/test/resources/log4j.properties new file mode 100644 index 0000000..866126b --- /dev/null +++ b/integration-test/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/integration-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/integration-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala new file mode 100644 index 0000000..0642e13 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.{File, FileOutputStream} +import java.nio.file.Paths +import java.util.{Properties, UUID} + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.internal.readiness.Readiness +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory +import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND + +private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { + import KubernetesSuite._ + private val testBackend = IntegrationTestBackendFactory.getTestBackend() + private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") + private var kubernetesTestComponents: KubernetesTestComponents = _ + private var testAppConf: TestAppConf = _ + private var staticAssetServerLauncher: StaticAssetServerLauncher = _ + + override def beforeAll(): Unit = { + testBackend.initialize() + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) + staticAssetServerLauncher = new StaticAssetServerLauncher( + kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) + } + + override def afterAll(): Unit = { + testBackend.cleanUp() + } + + before { + testAppConf = kubernetesTestComponents.newTestJobConf() + .set("spark.kubernetes.initcontainer.docker.image", "spark-init:latest") + .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") + .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) + kubernetesTestComponents.createNamespace() + } + + after { + kubernetesTestComponents.deleteNamespace() + } + + test("Use container-local resources without the resource staging server") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE) + } + + test("Use remote resources without the resource staging server.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer() + testAppConf.setJars(Seq( + s"$assetServerUri/${EXAMPLES_JAR_FILE.getName}", + s"$assetServerUri/${HELPER_JAR_FILE.getName}" + )) + runSparkPiAndVerifyCompletion() + } + + test("Submit small local files without the resource staging server.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + val testExistenceFileTempDir = Files.createTempDir() + testExistenceFileTempDir.deleteOnExit() + val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") + Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) + testAppConf.set("spark.files", testExistenceFile.getAbsolutePath) + runSparkApplicationAndVerifyCompletion( + CONTAINER_LOCAL_MAIN_APP_RESOURCE, + FILE_EXISTENCE_MAIN_CLASS, + Seq( + s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.", + s"File found on the executors at the relative path ${testExistenceFile.getName} with" + + s" the correct contents."), + Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS)) + } + + test("Use a very long application name.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + .set("spark.app.name", "long" * 40) + runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE) + } + + private def runSparkPiAndVerifyCompletion(appResource: String = ""): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_PI_MAIN_CLASS, + Seq("Pi is roughly 3"), + Array.empty[String]) + } + + private def runSparkApplicationAndVerifyCompletion( + appResource: String, + mainClass: String, + expectedLogOnCompletion: Seq[String], + appArgs: Array[String]): Unit = { + val appArguments = TestAppArguments( + mainAppResource = appResource, + mainClass = mainClass, + driverArgs = appArgs, + hadoopConfDir = None) + TestApp.run(testAppConf, appArguments) + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", APP_LOCATOR_LABEL) + .list() + .getItems + .get(0) + Eventually.eventually(TIMEOUT, INTERVAL) { + expectedLogOnCompletion.foreach { e => + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } + } + } + + private def createShuffleServiceDaemonSet(): Unit = { + val ds = kubernetesTestComponents.kubernetesClient.extensions().daemonSets() + .createNew() + .withNewMetadata() + .withName("shuffle") + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewMetadata() + .withLabels(Map("app" -> "spark-shuffle-service").asJava) + .endMetadata() + .withNewSpec() + .addNewVolume() + .withName("shuffle-dir") + .withNewHostPath() + .withPath("/tmp") + .endHostPath() + .endVolume() + .addNewContainer() + .withName("shuffle") + .withImage("spark-shuffle:latest") + .withImagePullPolicy("IfNotPresent") + .addNewVolumeMount() + .withName("shuffle-dir") + .withMountPath("/tmp") + .endVolumeMount() + .endContainer() + .endSpec() + .endTemplate() + .endSpec() + .done() + + // wait for daemonset to become available. + Eventually.eventually(TIMEOUT, INTERVAL) { + val pods = kubernetesTestComponents.kubernetesClient.pods() + .withLabel("app", "spark-shuffle-service").list().getItems + + if (pods.size() == 0 || !Readiness.isReady(pods.get(0))) { + throw ShuffleNotReadyException + } + } + } +} + +private[spark] object KubernetesSuite { + val EXAMPLES_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs") + .toFile + .listFiles()(0) + + val HELPER_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs-helpers") + .toFile + .listFiles()(0) + val SUBMITTER_LOCAL_MAIN_APP_RESOURCE = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" + val CONTAINER_LOCAL_MAIN_APP_RESOURCE = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" + val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" + val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.k8s" + + ".integrationtest.jobs.SparkPiWithInfiniteWait" + val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" + val SPARK_R_MAIN_CLASS = "org.apache.spark.deploy.RRunner" + val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = + "local:///opt/spark/examples/src/main/python/pi.py" + val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION = + "local:///opt/spark/examples/src/main/python/sort.py" + val SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION = + "local:///opt/spark/examples/src/main/r/dataframe.R" + val SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION = + "src/test/R/dataframe.R" + val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" + val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.k8s" + + ".integrationtest.jobs.FileExistenceTest" + val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.k8s" + + ".integrationtest.jobs.GroupByTest" + val JAVA_OPTIONS_MAIN_CLASS = "org.apache.spark.deploy.k8s" + + ".integrationtest.jobs.JavaOptionsTest" + val TEST_EXISTENCE_FILE_CONTENTS = "contents" + + case object ShuffleNotReadyException extends Exception +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala new file mode 100644 index 0000000..4af3038 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util.UUID + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.scalatest.concurrent.Eventually +import scala.collection.JavaConverters._ + +private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { + + val namespace = UUID.randomUUID().toString.replaceAll("-", "") + val kubernetesClient = defaultClient.inNamespace(namespace) + val clientConfig = kubernetesClient.getConfiguration + + def createNamespace(): Unit = { + defaultClient.namespaces.createNew() + .withNewMetadata() + .withName(namespace) + .endMetadata() + .done() + } + + def deleteNamespace(): Unit = { + defaultClient.namespaces.withName(namespace).delete() + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val namespaceList = defaultClient + .namespaces() + .list() + .getItems() + .asScala + require(!namespaceList.exists(_.getMetadata.getName == namespace)) + } + } + + def newTestJobConf(): TestAppConf = { + new TestAppConf() + .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") + .set("spark.kubernetes.namespace", namespace) + .set("spark.kubernetes.driver.docker.image", + System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) + .set("spark.kubernetes.executor.docker.image", + System.getProperty("spark.docker.test.executorImage", "spark-executor:latest")) + .setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath)) + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.name", "spark-test-app") + .set("spark.ui.enabled", "true") + .set("spark.testing", "false") + .set("spark.kubernetes.submission.waitAppCompletion", "false") + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala new file mode 100644 index 0000000..459c0a4 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Logging.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import org.apache.log4j.{Logger, LogManager, Priority} + +trait Logging { + + private val log: Logger = LogManager.getLogger(this.getClass) + + protected def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg) + + protected def logInfo(msg: => String) = if (log.isInfoEnabled) log.info(msg) + + protected def logWarning(msg: => String) = if (log.isEnabledFor(Priority.WARN)) log.warn(msg) + + protected def logWarning(msg: => String, throwable: Throwable) = + if (log.isEnabledFor(Priority.WARN)) log.warn(msg, throwable) + + protected def logError(msg: => String) = if (log.isEnabledFor(Priority.ERROR)) log.error(msg) +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala new file mode 100644 index 0000000..a851c7d --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.{BufferedReader, InputStreamReader} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.ArrayBuffer + +object ProcessUtils extends Logging { + /** + * executeProcess is used to run a command and return the output if it + * completes within timeout seconds. + */ + def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + + Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => + Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => + var line: String = null + do { + line = bufferedOutput.readLine() + if (line != null) { + logInfo(line) + outputLines += line + } + } while (line != null) + } + } + assert(proc.waitFor(timeout, TimeUnit.SECONDS), + s"Timed out while executing ${fullCommand.mkString(" ")}") + assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") + outputLines.toSeq + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala new file mode 100644 index 0000000..f1fd6dc --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness + +private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] { + + private val signal = SettableFuture.create[Boolean] + + override def eventReceived(action: Action, resource: T): Unit = { + if ((action == Action.MODIFIED || action == Action.ADDED) && + Readiness.isReady(resource)) { + signal.set(true) + } + } + + override def onClose(cause: KubernetesClientException): Unit = {} + + def waitUntilReady(): Boolean = signal.get(60, TimeUnit.SECONDS) +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala new file mode 100644 index 0000000..74c198f --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import io.fabric8.kubernetes.api.model.{HTTPGetActionBuilder, Pod} +import io.fabric8.kubernetes.client.KubernetesClient + +/** + * Launches a simple HTTP server which provides jars that can be downloaded by Spark applications + * in integration tests. + */ +private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClient) { + + // Returns the HTTP Base URI of the server. + def launchStaticAssetServer(): String = { + val readinessWatcher = new SparkReadinessWatcher[Pod] + val probePingHttpGet = new HTTPGetActionBuilder() + .withNewPort(8080) + .withScheme("HTTP") + .withPath("/") + .build() + Utils.tryWithResource(kubernetesClient + .pods() + .withName("integration-test-static-assets") + .watch(readinessWatcher)) { _ => + val pod = kubernetesClient.pods().createNew() + .withNewMetadata() + .withName("integration-test-static-assets") + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("static-asset-server-container") + .withImage("spark-integration-test-asset-server:latest") + .withImagePullPolicy("IfNotPresent") + .withNewReadinessProbe() + .withHttpGet(probePingHttpGet) + .endReadinessProbe() + .endContainer() + .endSpec() + .done() + readinessWatcher.waitUntilReady() + val podIP = kubernetesClient.pods().withName(pod.getMetadata.getName).get() + .getStatus + .getPodIP + s"http://$podIP:8080" + } + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala new file mode 100644 index 0000000..11197d7 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import scala.collection.mutable + +class TestAppConf { + + private val keys = mutable.Map[String, String]() + + def set(key:String, value: String): TestAppConf = { + keys.put(key, value) + this + } + + def setJars(jars: Seq[String]) = set("spark.jars", jars.mkString(",")) +} + +private[spark] case class TestAppArguments( + mainAppResource: String, + mainClass: String, + driverArgs: Array[String], + hadoopConfDir: Option[String]) + +object TestApp { + + def run(testAppConf: TestAppConf, appArguments: TestAppArguments) = None +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala new file mode 100644 index 0000000..e4a8f86 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.{Closeable, InputStream, IOException, OutputStream} + +object Utils { + + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { + val resource = createResource + try f.apply(resource) finally resource.close() + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala new file mode 100644 index 0000000..bae1d2d --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.GCE + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND + +private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + var k8ConfBuilder = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(master.replaceFirst("k8s://", "")) + defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build) + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def name(): String = GCE_TEST_BACKEND +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala new file mode 100644 index 0000000..b61daf6 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.backend + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend} +import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder + +private[spark] trait IntegrationTestBackend { + def name(): String + def initialize(): Unit + def getKubernetesClient(): DefaultKubernetesClient + def cleanUp(): Unit = {} +} + +private[spark] object IntegrationTestBackendFactory { + def getTestBackend(): IntegrationTestBackend = { + Option(System.getProperty("spark.kubernetes.test.master")) + .map(new GCETestBackend(_)) + .getOrElse(new MinikubeTestBackend()) + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala new file mode 100644 index 0000000..b394fa4 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.minikube + +import java.nio.file.Paths + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.commons.lang3.SystemUtils +import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils} + +// TODO support windows +private[spark] object Minikube extends Logging { + private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) { + Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile + } else if (SystemUtils.IS_OS_WINDOWS) { + throw new IllegalStateException("Executing Minikube based integration tests not yet " + + " available on Windows.") + } else { + Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile + } + + private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " + + s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}" + + private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + + def startMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.RUNNING) { + executeMinikube("start", "--memory", "6000", "--cpus", "8") + } else { + logInfo("Minikube is already started.") + } + } + + def getMinikubeIp: String = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val outputs = executeMinikube("ip") + .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) + assert(outputs.size == 1, "Unexpected amount of output from minikube ip") + outputs.head + } + + def getMinikubeStatus: MinikubeStatus.Value = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + val statusString = executeMinikube("status") + .filter(_.contains("minikube: ")) + .head + .replaceFirst("minikube: ", "") + MinikubeStatus.unapply(statusString) + .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) + } + + def getDockerEnv: Map[String, String] = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + executeMinikube("docker-env", "--shell", "bash") + .filter(_.startsWith("export")) + .map(_.replaceFirst("export ", "").split('=')) + .map(arr => (arr(0), arr(1).replaceAllLiterally("\"", ""))) + .toMap + } + + def deleteMinikube(): Unit = synchronized { + assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) + if (getMinikubeStatus != MinikubeStatus.NONE) { + executeMinikube("delete") + } else { + logInfo("Minikube was already not running.") + } + } + + def getKubernetesClient: DefaultKubernetesClient = synchronized { + val kubernetesMaster = s"https://${getMinikubeIp}:8443" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + new DefaultKubernetesClient(kubernetesConf) + } + + def executeMinikubeSsh(command: String): Unit = { + executeMinikube("ssh", command) + } + + private def executeMinikube(action: String, args: String*): Seq[String] = { + if (!MINIKUBE_EXECUTABLE_DEST.canExecute) { + if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) { + throw new IllegalStateException("Failed to make the Minikube binary executable.") + } + } + ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, + MINIKUBE_STARTUP_TIMEOUT_SECONDS) + } +} + +private[spark] object MinikubeStatus extends Enumeration { + + // The following states are listed according to + // https://github.com/docker/machine/blob/master/libmachine/state/state.go. + val STARTING = status("Starting") + val RUNNING = status("Running") + val PAUSED = status("Paused") + val STOPPING = status("Stopping") + val STOPPED = status("Stopped") + val ERROR = status("Error") + val TIMEOUT = status("Timeout") + val SAVED = status("Saved") + val NONE = status("") + + def status(value: String): Value = new Val(nextId, value) + def unapply(s: String): Option[Value] = values.find(s == _.toString) +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala new file mode 100644 index 0000000..8e94f13 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.minikube + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder + +private[spark] class MinikubeTestBackend extends IntegrationTestBackend { + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + Minikube.startMinikube() + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + defaultClient = Minikube.getKubernetesClient + } + + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + + override def cleanUp(): Unit = { + if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { + Minikube.deleteMinikube() + } + } + + override def name(): String = MINIKUBE_TEST_BACKEND +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala new file mode 100644 index 0000000..0807a68 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +package object constants { + val MINIKUBE_TEST_BACKEND = "minikube" + val GCE_TEST_BACKEND = "gce" +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala new file mode 100644 index 0000000..ea98e6c --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.docker + +import java.io.File +import java.net.URI +import java.nio.file.Paths + +import scala.collection.JavaConverters._ + +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} +import org.apache.http.client.utils.URIBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +import org.apache.spark.deploy.k8s.integrationtest.Logging + +private[spark] class SparkDockerImageBuilder + (private val dockerEnv: Map[String, String]) extends Logging{ + + private val DOCKER_BUILD_PATH = Paths.get("target", "docker") + // Dockerfile paths must be relative to the build path. + private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" + private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" + private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" + private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile" + private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" + private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile" + private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" + private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" + private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" + private val STATIC_ASSET_SERVER_DOCKER_FILE = + "dockerfiles/integration-test-asset-server/Dockerfile" + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", + throw new IllegalStateException("DOCKER_HOST env not found.")) + + private val originalDockerUri = URI.create(dockerHost) + private val httpsDockerUri = new URIBuilder() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() + + private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + + private val dockerClient = new DefaultDockerClient.Builder() + .uri(httpsDockerUri) + .dockerCertificates(DockerCertificates + .builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build().get()) + .build() + + def buildSparkDockerImages(): Unit = { + Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + buildImage("spark-base", BASE_DOCKER_FILE) + buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) + buildImage("spark-driver-r", DRIVERR_DOCKER_FILE) + buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) + buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE) + buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) + buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) + buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) + buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE) + } + + private def buildImage(name: String, dockerFile: String): Unit = { + dockerClient.build( + DOCKER_BUILD_PATH, + name, + dockerFile, + new LoggingBuildHandler()) + } +} diff --git a/integration-test/test-data/input.txt b/integration-test/test-data/input.txt new file mode 100644 index 0000000..dfe437b --- /dev/null +++ b/integration-test/test-data/input.txt @@ -0,0 +1 @@ +Contents From 7c8612ce82a3401f8db12843930e8f0f328108e8 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 15:44:41 -0800 Subject: [PATCH 02/22] Clean up pom.xml --- integration-test/pom.xml | 99 ---------------------------------------- 1 file changed, 99 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 479ce6b..eb7e58d 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -23,7 +23,6 @@ 0.1-SNAPSHOT 3.5 - 2.6.5 1.3.9 3.0.0 1.2.17 @@ -40,26 +39,6 @@ commons-logging 1.1.1 - - com.fasterxml.jackson.core - jackson-annotations - ${fasterxml.jackson.version} - - - com.fasterxml.jackson.core - jackson-core - ${fasterxml.jackson.version} - - - com.fasterxml.jackson.core - jackson-databind - ${fasterxml.jackson.version} - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - ${fasterxml.jackson.version} - com.google.code.findbugs jsr305 @@ -77,55 +56,11 @@ docker-client 5.0.2 test - - - - com.fasterxml.jackson.jaxrs - jackson-jaxrs-json-provider - - - com.fasterxml.jackson.core - jackson-databind - - - org.glassfish.jersey.core - jersey-client - - - org.glassfish.jersey.core - jersey-common - - - javax.ws.rs - jsr311-api - - io.fabric8 kubernetes-client ${kubernetes.client.version} - - - com.fasterxml.jackson.core - jackson-core - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - com.fasterxml.jackson.dataformat - jackson-dataformat-yaml - - log4j @@ -185,40 +120,6 @@ - - copy-integration-python - pre-integration-test - - copy-resources - - - ${project.build.directory}/docker/python - - - ${project.parent.basedir}/python - - ${project.parent.basedir}/python/.egg - ${project.parent.basedir}/python/dist - - - - - - - copy-integration-r - pre-integration-test - - copy-resources - - - ${project.build.directory}/docker/R - - - ${project.parent.basedir}/R - - - - copy-integration-data pre-integration-test From 5a950eb6eb5c011bfc77577f95d7ed9a7e16226f Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 16:09:41 -0800 Subject: [PATCH 03/22] Unpack distro in place --- integration-test/pom.xml | 43 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index eb7e58d..480371d 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -29,6 +29,7 @@ 2.11.8 2.11 kubernetes-integration-tests + YOUR-SPARK-DISTRO-HERE jar Spark Project Kubernetes Integration Tests @@ -100,6 +101,48 @@ + + org.codehaus.mojo + truezip-maven-plugin + 1.2 + + + unpack-spark-distro + + copy + + pre-integration-test + + + ${spark-distro-archive} + ${project.build.directory}/spark-distro-archive + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.4.0 + + + rename-spark-distro-output-dir + pre-integration-test + + exec + + + ${project.build.directory}/spark-distro-archive + /bin/sh + + -c + mv * ../docker + + + + + maven-resources-plugin 3.0.2 From b14430e0f73ea22cf0f7c439136045004211e8b5 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 16:14:54 -0800 Subject: [PATCH 04/22] Clean up redundant resources --- integration-test/pom.xml | 64 ---------------------------------------- 1 file changed, 64 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 480371d..11c00e6 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -163,70 +163,6 @@ - - copy-integration-data - pre-integration-test - - copy-resources - - - ${project.build.directory}/docker/data - - - ${project.parent.basedir}/data - true - - - - - - copy-integration-licenses - pre-integration-test - - copy-resources - - - ${project.build.directory}/docker/licenses - - - ${project.parent.basedir}/licenses - true - - - - - - copy-integration-examples-jar - pre-integration-test - - copy-resources - - - ${project.build.directory}/docker/examples/jars - - - ${project.parent.basedir}/examples/target/scala-2.11/jars - true - - - - - - copy-integration-examples-src - pre-integration-test - - copy-resources - - - ${project.build.directory}/docker/examples/src/main - - - ${project.parent.basedir}/examples/src/main - true - - - - From 0443af8bc052b2851d857a53f26dd06ab309fcac Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 16:41:26 -0800 Subject: [PATCH 05/22] Avoid buggy truezip --- integration-test/pom.xml | 45 +++------------------------------------- 1 file changed, 3 insertions(+), 42 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 11c00e6..2f3441c 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -29,7 +29,7 @@ 2.11.8 2.11 kubernetes-integration-tests - YOUR-SPARK-DISTRO-HERE + YOUR-SPARK-DISTRO-TARBALL-HERE jar Spark Project Kubernetes Integration Tests @@ -101,26 +101,6 @@ - - org.codehaus.mojo - truezip-maven-plugin - 1.2 - - - unpack-spark-distro - - copy - - pre-integration-test - - - ${spark-distro-archive} - ${project.build.directory}/spark-distro-archive - - - - - org.codehaus.mojo exec-maven-plugin @@ -133,11 +113,11 @@ exec - ${project.build.directory}/spark-distro-archive + ${project.build.directory} /bin/sh -c - mv * ../docker + mkdir spark-distro-unpacked; cd spark-distro-unpacked; tar xfz ${spark-distro-tgz}; mv * ../docker @@ -165,25 +145,6 @@ - - org.apache.maven.plugins - maven-antrun-plugin - 1.6 - - - create-release-file - pre-integration-test - - run - - - - - - - - - com.googlecode.maven-download-plugin download-maven-plugin From 567ad3eab6eeba500b1420b597fbe5cd564bb15f Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 16:51:58 -0800 Subject: [PATCH 06/22] Cleaned up docker image builder --- .../docker/SparkDockerImageBuilder.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index ea98e6c..b1cb4c8 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -16,12 +16,9 @@ */ package org.apache.spark.deploy.k8s.integrationtest.docker -import java.io.File import java.net.URI import java.nio.file.Paths -import scala.collection.JavaConverters._ - import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} import org.apache.http.client.utils.URIBuilder import org.scalatest.concurrent.{Eventually, PatienceConfiguration} @@ -36,14 +33,7 @@ private[spark] class SparkDockerImageBuilder // Dockerfile paths must be relative to the build path. private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" - private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" - private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" - private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" - private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile" - private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" - private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" - private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" private val STATIC_ASSET_SERVER_DOCKER_FILE = "dockerfiles/integration-test-asset-server/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) @@ -73,14 +63,7 @@ private[spark] class SparkDockerImageBuilder Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) - buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) - buildImage("spark-driver-r", DRIVERR_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) - buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) - buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE) - buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) - buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) - buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE) } From 74c158b703947fc3043f5fcc96bc7ddba4ec807f Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 17:00:05 -0800 Subject: [PATCH 07/22] Builds some docker images --- integration-test/pom.xml | 6 +++--- .../integrationtest/docker/SparkDockerImageBuilder.scala | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 2f3441c..76aa673 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -107,7 +107,7 @@ 1.4.0 - rename-spark-distro-output-dir + unpack-spark-distro pre-integration-test exec @@ -117,7 +117,7 @@ /bin/sh -c - mkdir spark-distro-unpacked; cd spark-distro-unpacked; tar xfz ${spark-distro-tgz}; mv * ../docker + mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp @@ -134,7 +134,7 @@ copy-resources - ${project.build.directory}/docker/dockerfiles + ${project.build.directory}/spark-distro/dockerfiles src/main/docker diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index b1cb4c8..c17e736 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -29,7 +29,7 @@ import org.apache.spark.deploy.k8s.integrationtest.Logging private[spark] class SparkDockerImageBuilder (private val dockerEnv: Map[String, String]) extends Logging{ - private val DOCKER_BUILD_PATH = Paths.get("target", "docker") + private val DOCKER_BUILD_PATH = Paths.get("target", "spark-distro") // Dockerfile paths must be relative to the build path. private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" @@ -73,5 +73,6 @@ private[spark] class SparkDockerImageBuilder name, dockerFile, new LoggingBuildHandler()) + logInfo(s"Built $name docker image") } } From bde1cf9881c863d53d138d90816554ab2ec8af0e Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 17:28:38 -0800 Subject: [PATCH 08/22] Drop http jar support for now --- .../k8s/integrationtest/KubernetesSuite.scala | 13 ------------- .../docker/SparkDockerImageBuilder.scala | 3 --- 2 files changed, 16 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 0642e13..b7b6611 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -37,13 +37,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") private var kubernetesTestComponents: KubernetesTestComponents = _ private var testAppConf: TestAppConf = _ - private var staticAssetServerLauncher: StaticAssetServerLauncher = _ override def beforeAll(): Unit = { testBackend.initialize() kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) - staticAssetServerLauncher = new StaticAssetServerLauncher( - kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace)) } override def afterAll(): Unit = { @@ -69,16 +66,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE) } - test("Use remote resources without the resource staging server.") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) - val assetServerUri = staticAssetServerLauncher.launchStaticAssetServer() - testAppConf.setJars(Seq( - s"$assetServerUri/${EXAMPLES_JAR_FILE.getName}", - s"$assetServerUri/${HELPER_JAR_FILE.getName}" - )) - runSparkPiAndVerifyCompletion() - } - test("Submit small local files without the resource staging server.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index c17e736..b17e3ba 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -34,8 +34,6 @@ private[spark] class SparkDockerImageBuilder private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" - private val STATIC_ASSET_SERVER_DOCKER_FILE = - "dockerfiles/integration-test-asset-server/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", @@ -64,7 +62,6 @@ private[spark] class SparkDockerImageBuilder buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) - buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE) } private def buildImage(name: String, dockerFile: String): Unit = { From 86245457c5b759f57c395317105bb1bcd4b4919c Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 15 Dec 2017 17:36:38 -0800 Subject: [PATCH 09/22] Clean up --- .../k8s/integrationtest/KubernetesSuite.scala | 43 ------------------- 1 file changed, 43 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index b7b6611..3fdfb74 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -127,49 +127,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } } } - - private def createShuffleServiceDaemonSet(): Unit = { - val ds = kubernetesTestComponents.kubernetesClient.extensions().daemonSets() - .createNew() - .withNewMetadata() - .withName("shuffle") - .endMetadata() - .withNewSpec() - .withNewTemplate() - .withNewMetadata() - .withLabels(Map("app" -> "spark-shuffle-service").asJava) - .endMetadata() - .withNewSpec() - .addNewVolume() - .withName("shuffle-dir") - .withNewHostPath() - .withPath("/tmp") - .endHostPath() - .endVolume() - .addNewContainer() - .withName("shuffle") - .withImage("spark-shuffle:latest") - .withImagePullPolicy("IfNotPresent") - .addNewVolumeMount() - .withName("shuffle-dir") - .withMountPath("/tmp") - .endVolumeMount() - .endContainer() - .endSpec() - .endTemplate() - .endSpec() - .done() - - // wait for daemonset to become available. - Eventually.eventually(TIMEOUT, INTERVAL) { - val pods = kubernetesTestComponents.kubernetesClient.pods() - .withLabel("app", "spark-shuffle-service").list().getItems - - if (pods.size() == 0 || !Readiness.isReady(pods.get(0))) { - throw ShuffleNotReadyException - } - } - } } private[spark] object KubernetesSuite { From 5585a0a4dbc99d9c8c6228d265f1077895e27a2e Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Sat, 16 Dec 2017 14:51:28 -0800 Subject: [PATCH 10/22] Use spark-submit --- .../k8s/integrationtest/KubernetesSuite.scala | 87 +++++-------------- .../KubernetesTestComponents.scala | 50 ++++++++++- .../deploy/k8s/integrationtest/TestApp.scala | 42 --------- .../minikube/MinikubeTestBackend.scala | 4 +- .../k8s/integrationtest/constants.scala | 3 + .../docker/SparkDockerImageBuilder.scala | 3 +- 6 files changed, 76 insertions(+), 113 deletions(-) delete mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 3fdfb74..5096572 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.{File, FileOutputStream} import java.nio.file.Paths import java.util.{Properties, UUID} +import java.util.regex.Pattern import com.google.common.base.Charsets -import com.google.common.io.Files +import com.google.common.io.{Files, PatternFilenameFilter} import io.fabric8.kubernetes.client.internal.readiness.Readiness import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} @@ -30,13 +31,14 @@ import scala.collection.JavaConverters._ import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND +import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { import KubernetesSuite._ private val testBackend = IntegrationTestBackendFactory.getTestBackend() private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") private var kubernetesTestComponents: KubernetesTestComponents = _ - private var testAppConf: TestAppConf = _ + private var sparkAppConf: SparkAppConf = _ override def beforeAll(): Unit = { testBackend.initialize() @@ -48,7 +50,7 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } before { - testAppConf = kubernetesTestComponents.newTestJobConf() + sparkAppConf = kubernetesTestComponents.newSparkAppConf() .set("spark.kubernetes.initcontainer.docker.image", "spark-init:latest") .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) @@ -59,40 +61,21 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit kubernetesTestComponents.deleteNamespace() } - test("Use container-local resources without the resource staging server") { + test("Run SparkPi with no resources") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) - runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE) + runSparkPiAndVerifyCompletion() } - test("Submit small local files without the resource staging server.") { - assume(testBackend.name == MINIKUBE_TEST_BACKEND) - testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) - val testExistenceFileTempDir = Files.createTempDir() - testExistenceFileTempDir.deleteOnExit() - val testExistenceFile = new File(testExistenceFileTempDir, "input.txt") - Files.write(TEST_EXISTENCE_FILE_CONTENTS, testExistenceFile, Charsets.UTF_8) - testAppConf.set("spark.files", testExistenceFile.getAbsolutePath) - runSparkApplicationAndVerifyCompletion( - CONTAINER_LOCAL_MAIN_APP_RESOURCE, - FILE_EXISTENCE_MAIN_CLASS, - Seq( - s"File found at /opt/spark/work-dir/${testExistenceFile.getName} with correct contents.", - s"File found on the executors at the relative path ${testExistenceFile.getName} with" + - s" the correct contents."), - Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS)) - } - - test("Use a very long application name.") { + test("Run SparkPi with a very long application name.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) - testAppConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) - .set("spark.app.name", "long" * 40) - runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE) + sparkAppConf.set("spark.app.name", "long" * 40) + runSparkPiAndVerifyCompletion() } - private def runSparkPiAndVerifyCompletion(appResource: String = ""): Unit = { + private def runSparkPiAndVerifyCompletion( + appResource: String = CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR): Unit = { runSparkApplicationAndVerifyCompletion( appResource, SPARK_PI_MAIN_CLASS, @@ -105,12 +88,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit mainClass: String, expectedLogOnCompletion: Seq[String], appArgs: Array[String]): Unit = { - val appArguments = TestAppArguments( + val appArguments = SparkAppArguments( mainAppResource = appResource, - mainClass = mainClass, - driverArgs = appArgs, - hadoopConfDir = None) - TestApp.run(testAppConf, appArguments) + mainClass = mainClass) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt) val driverPod = kubernetesTestComponents.kubernetesClient .pods() .withLabel("spark-app-locator", APP_LOCATOR_LABEL) @@ -130,40 +111,16 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } private[spark] object KubernetesSuite { - val EXAMPLES_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs") - .toFile - .listFiles()(0) - val HELPER_JAR_FILE = Paths.get("target", "integration-tests-spark-jobs-helpers") - .toFile - .listFiles()(0) - val SUBMITTER_LOCAL_MAIN_APP_RESOURCE = s"file://${EXAMPLES_JAR_FILE.getAbsolutePath}" - val CONTAINER_LOCAL_MAIN_APP_RESOURCE = s"local:///opt/spark/examples/" + - s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" - val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + - s"integration-tests-jars/${HELPER_JAR_FILE.getName}" val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) - val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.k8s" + - ".integrationtest.jobs.SparkPiWithInfiniteWait" - val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" - val SPARK_R_MAIN_CLASS = "org.apache.spark.deploy.RRunner" - val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = - "local:///opt/spark/examples/src/main/python/pi.py" - val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION = - "local:///opt/spark/examples/src/main/python/sort.py" - val SPARK_R_DATAFRAME_SUBMITTER_FILE_LOCATION = - "local:///opt/spark/examples/src/main/r/dataframe.R" - val SPARK_R_DATAFRAME_CONTAINER_LOCAL_FILE_LOCATION = - "src/test/R/dataframe.R" - val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" - val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.k8s" + - ".integrationtest.jobs.FileExistenceTest" - val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.k8s" + - ".integrationtest.jobs.GroupByTest" - val JAVA_OPTIONS_MAIN_CLASS = "org.apache.spark.deploy.k8s" + - ".integrationtest.jobs.JavaOptionsTest" - val TEST_EXISTENCE_FILE_CONTENTS = "contents" + val SPARK_DISTRO_EXAMPLES_JAR_FILE: File = Paths.get(SPARK_DISTRO_PATH.toFile.getAbsolutePath, + "examples", "jars") + .toFile + .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) + val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR: String = s"local:///opt/spark/examples/" + + s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}" + val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" case object ShuffleNotReadyException extends Exception } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 4af3038..b480940 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -16,12 +16,16 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import java.nio.file.Paths import java.util.UUID import io.fabric8.kubernetes.client.DefaultKubernetesClient -import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import scala.collection.mutable import scala.collection.JavaConverters._ +import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH + private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { val namespace = UUID.randomUUID().toString.replaceAll("-", "") @@ -48,15 +52,14 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl } } - def newTestJobConf(): TestAppConf = { - new TestAppConf() + def newSparkAppConf(): SparkAppConf = { + new SparkAppConf() .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") .set("spark.kubernetes.namespace", namespace) .set("spark.kubernetes.driver.docker.image", System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) .set("spark.kubernetes.executor.docker.image", System.getProperty("spark.docker.test.executorImage", "spark-executor:latest")) - .setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath)) .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") @@ -66,3 +69,42 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl .set("spark.kubernetes.submission.waitAppCompletion", "false") } } + +private[spark] class SparkAppConf { + + private val map = mutable.Map[String, String]() + + def set(key:String, value: String): SparkAppConf = { + map.put(key, value) + this + } + + def get(key: String): String = map.getOrElse(key, "") + + def setJars(jars: Seq[String]) = set("spark.jars", jars.mkString(",")) + + override def toString: String = map.toString + + def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}")) +} + +private[spark] case class SparkAppArguments( + mainAppResource: String, + mainClass: String) + +private[spark] object SparkAppLauncher extends Logging { + + private val SPARK_SUBMIT_EXECUTABLE_DEST = Paths.get(SPARK_DISTRO_PATH.toFile.getAbsolutePath, + "bin", "spark-submit").toFile + + def launch(appArguments: SparkAppArguments, appConf: SparkAppConf, timeoutSecs: Int): Unit = { + logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") + val commandLine = Array(SPARK_SUBMIT_EXECUTABLE_DEST.getAbsolutePath, + "--deploy-mode", "cluster", + "--class", appArguments.mainClass, + "--master", appConf.get("spark.master") + ) ++ appConf.toStringArray :+ appArguments.mainAppResource + logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") + ProcessUtils.executeProcess(commandLine, timeoutSecs) + } +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala deleted file mode 100644 index 11197d7..0000000 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/TestApp.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.integrationtest - -import scala.collection.mutable - -class TestAppConf { - - private val keys = mutable.Map[String, String]() - - def set(key:String, value: String): TestAppConf = { - keys.put(key, value) - this - } - - def setJars(jars: Seq[String]) = set("spark.jars", jars.mkString(",")) -} - -private[spark] case class TestAppArguments( - mainAppResource: String, - mainClass: String, - driverArgs: Array[String], - hadoopConfDir: Option[String]) - -object TestApp { - - def run(testAppConf: TestAppConf, appArguments: TestAppArguments) = None -} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index 8e94f13..7a1433e 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -27,7 +27,9 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend { override def initialize(): Unit = { Minikube.startMinikube() - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) { + new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + } defaultClient = Minikube.getKubernetesClient } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala index 0807a68..9137199 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala @@ -16,7 +16,10 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import java.nio.file.Paths + package object constants { val MINIKUBE_TEST_BACKEND = "minikube" val GCE_TEST_BACKEND = "gce" + val SPARK_DISTRO_PATH = Paths.get("target", "spark-distro") } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index b17e3ba..5c43ed6 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -24,12 +24,13 @@ import org.apache.http.client.utils.URIBuilder import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} +import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH import org.apache.spark.deploy.k8s.integrationtest.Logging private[spark] class SparkDockerImageBuilder (private val dockerEnv: Map[String, String]) extends Logging{ - private val DOCKER_BUILD_PATH = Paths.get("target", "spark-distro") + private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH // Dockerfile paths must be relative to the build path. private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" From f793e387bae5a083a7cded7a366b5d0ed3386f17 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Sat, 16 Dec 2017 15:01:50 -0800 Subject: [PATCH 11/22] Tests pass --- integration-test/pom.xml | 22 ------- .../integration-test-asset-server/Dockerfile | 21 ------- .../k8s/integrationtest/KubernetesSuite.scala | 9 +-- .../KubernetesTestComponents.scala | 2 +- .../StaticAssetServerLauncher.scala | 62 ------------------- .../deploy/k8s/integrationtest/Utils.scala | 2 +- .../backend/IntegrationTestBackend.scala | 3 +- 7 files changed, 6 insertions(+), 115 deletions(-) delete mode 100644 integration-test/src/main/docker/integration-test-asset-server/Dockerfile delete mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 76aa673..b3d59a1 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -123,28 +123,6 @@ - - maven-resources-plugin - 3.0.2 - - - copy-integration-test-http-server-dockerfile - pre-integration-test - - copy-resources - - - ${project.build.directory}/spark-distro/dockerfiles - - - src/main/docker - true - - - - - - com.googlecode.maven-download-plugin download-maven-plugin diff --git a/integration-test/src/main/docker/integration-test-asset-server/Dockerfile b/integration-test/src/main/docker/integration-test-asset-server/Dockerfile deleted file mode 100644 index e26d207..0000000 --- a/integration-test/src/main/docker/integration-test-asset-server/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Simple asset server that can provide the integration test jars over HTTP. -FROM trinitronx/python-simplehttpserver:travis-12 - -ADD examples/integration-tests-jars /var/www diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 5096572..936f78b 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -16,18 +16,15 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.io.{File, FileOutputStream} +import java.io.File import java.nio.file.Paths -import java.util.{Properties, UUID} +import java.util.UUID import java.util.regex.Pattern -import com.google.common.base.Charsets import com.google.common.io.{Files, PatternFilenameFilter} -import io.fabric8.kubernetes.client.internal.readiness.Readiness import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} -import scala.collection.JavaConverters._ import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND @@ -118,7 +115,7 @@ private[spark] object KubernetesSuite { "examples", "jars") .toFile .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) - val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR: String = s"local:///opt/spark/examples/" + + val CONTAINER_LOCAL_SPARK_DISTRO_EXAMPLES_JAR: String = s"local:///opt/spark/examples/jars/" + s"${SPARK_DISTRO_EXAMPLES_JAR_FILE.getName}" val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index b480940..37bb5c1 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -20,7 +20,7 @@ import java.nio.file.Paths import java.util.UUID import io.fabric8.kubernetes.client.DefaultKubernetesClient -import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.concurrent.Eventually import scala.collection.mutable import scala.collection.JavaConverters._ diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala deleted file mode 100644 index 74c198f..0000000 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/StaticAssetServerLauncher.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.integrationtest - -import io.fabric8.kubernetes.api.model.{HTTPGetActionBuilder, Pod} -import io.fabric8.kubernetes.client.KubernetesClient - -/** - * Launches a simple HTTP server which provides jars that can be downloaded by Spark applications - * in integration tests. - */ -private[spark] class StaticAssetServerLauncher(kubernetesClient: KubernetesClient) { - - // Returns the HTTP Base URI of the server. - def launchStaticAssetServer(): String = { - val readinessWatcher = new SparkReadinessWatcher[Pod] - val probePingHttpGet = new HTTPGetActionBuilder() - .withNewPort(8080) - .withScheme("HTTP") - .withPath("/") - .build() - Utils.tryWithResource(kubernetesClient - .pods() - .withName("integration-test-static-assets") - .watch(readinessWatcher)) { _ => - val pod = kubernetesClient.pods().createNew() - .withNewMetadata() - .withName("integration-test-static-assets") - .endMetadata() - .withNewSpec() - .addNewContainer() - .withName("static-asset-server-container") - .withImage("spark-integration-test-asset-server:latest") - .withImagePullPolicy("IfNotPresent") - .withNewReadinessProbe() - .withHttpGet(probePingHttpGet) - .endReadinessProbe() - .endContainer() - .endSpec() - .done() - readinessWatcher.waitUntilReady() - val podIP = kubernetesClient.pods().withName(pod.getMetadata.getName).get() - .getStatus - .getPodIP - s"http://$podIP:8080" - } - } -} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index e4a8f86..623a38f 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.io.{Closeable, InputStream, IOException, OutputStream} +import java.io.Closeable object Utils { diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index b61daf6..51f8b96 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -20,8 +20,7 @@ package org.apache.spark.deploy.k8s.integrationtest.backend import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend -import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.{Minikube, MinikubeTestBackend} -import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend private[spark] trait IntegrationTestBackend { def name(): String From 069ae5032e11a3a7b62bf9562be07dfec4ceee7e Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Sat, 16 Dec 2017 16:54:12 -0800 Subject: [PATCH 12/22] Add hacks for dockerfiles and entrypoint.sh --- integration-test/pom.xml | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index b3d59a1..e35d567 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -30,6 +30,7 @@ 2.11 kubernetes-integration-tests YOUR-SPARK-DISTRO-TARBALL-HERE + YOUR-DOCKERFILES-DIR-HERE jar Spark Project Kubernetes Integration Tests @@ -121,6 +122,38 @@ + + + copy-dockerfiles-if-missing + pre-integration-test + + exec + + + ${project.build.directory}/spark-distro + /bin/sh + + -c + test -d dockerfiles || cp -pr ${spark-dockerfiles-dir} dockerfiles + + + + + + set-exec-bit-on-docker-entrypoint-sh + pre-integration-test + + exec + + + ${project.build.directory}/spark-distro/dockerfiles + /bin/chmod + + +x + spark-base/entrypoint.sh + + + @@ -156,8 +189,7 @@ + the test phase. --> org.scalatest scalatest-maven-plugin From 204c51fba562d25953c7fa1eb39d7104945b4d6c Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 12:08:20 -0800 Subject: [PATCH 13/22] Address review comments --- .../k8s/integrationtest/KubernetesSuite.scala | 5 +-- .../KubernetesTestComponents.scala | 7 +++- .../k8s/integrationtest/ProcessUtils.scala | 18 ++++----- .../deploy/k8s/integrationtest/Utils.scala | 40 ++++++++++++++++++- .../backend/GCE/GCETestBackend.scala | 8 ++-- .../backend/minikube/Minikube.scala | 3 ++ .../docker/SparkDockerImageBuilder.scala | 16 ++++---- 7 files changed, 69 insertions(+), 28 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 936f78b..03ea99c 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -21,7 +21,7 @@ import java.nio.file.Paths import java.util.UUID import java.util.regex.Pattern -import com.google.common.io.{Files, PatternFilenameFilter} +import com.google.common.io.PatternFilenameFilter import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} @@ -31,6 +31,7 @@ import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKE import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { + import KubernetesSuite._ private val testBackend = IntegrationTestBackendFactory.getTestBackend() private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "") @@ -48,8 +49,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit before { sparkAppConf = kubernetesTestComponents.newSparkAppConf() - .set("spark.kubernetes.initcontainer.docker.image", "spark-init:latest") - .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) kubernetesTestComponents.createNamespace() } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 37bb5c1..13a02b5 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -19,11 +19,12 @@ package org.apache.spark.deploy.k8s.integrationtest import java.nio.file.Paths import java.util.UUID -import io.fabric8.kubernetes.client.DefaultKubernetesClient -import org.scalatest.concurrent.Eventually import scala.collection.mutable import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.scalatest.concurrent.Eventually + import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { @@ -56,6 +57,8 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl new SparkAppConf() .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") .set("spark.kubernetes.namespace", namespace) + // TODO: apache/spark#19995 is changing docker.image to container.image in these properties. + // Update them once the PR is merged. .set("spark.kubernetes.driver.docker.image", System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) .set("spark.kubernetes.executor.docker.image", diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index a851c7d..ede6e64 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import java.io.{BufferedReader, InputStreamReader} import java.util.concurrent.TimeUnit import scala.collection.mutable.ArrayBuffer +import scala.io.Source object ProcessUtils extends Logging { /** @@ -32,17 +32,13 @@ object ProcessUtils extends Logging { val proc = pb.start() val outputLines = new ArrayBuffer[String] - Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput => - Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) => - var line: String = null - do { - line = bufferedOutput.readLine() - if (line != null) { - logInfo(line) - outputLines += line - } - } while (line != null) + Utils.tryWithResource(Source.fromInputStream(proc.getInputStream, "UTF-8")) { output => + for (line <- output.getLines) { + logInfo(line) + outputLines += line } + }{ + output => output.close() } assert(proc.waitFor(timeout, TimeUnit.SECONDS), s"Timed out while executing ${fullCommand.mkString(" ")}") diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 623a38f..bf4562d 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -17,11 +17,49 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.Closeable +import java.net.URI -object Utils { +object Utils extends Logging { def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { val resource = createResource try f.apply(resource) finally resource.close() } + + def tryWithResource[R, T](createResource: => R)(f: R => T)(closeResource: R => T): T = { + val resource = createResource + try f.apply(resource) finally closeResource(resource) + } + + def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { + require(rawMasterURL.startsWith("k8s://"), + "Kubernetes master URL must start with k8s://.") + val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) + + // To handle master URLs, e.g., k8s://host:port. + if (!masterWithoutK8sPrefix.contains("://")) { + val resolvedURL = s"https://$masterWithoutK8sPrefix" + logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + + s"URL is $resolvedURL.") + return s"k8s://$resolvedURL" + } + + val masterScheme = new URI(masterWithoutK8sPrefix).getScheme + val resolvedURL = masterScheme.toLowerCase match { + case "https" => + masterWithoutK8sPrefix + case "http" => + logWarning("Kubernetes master URL uses HTTP instead of HTTPS.") + masterWithoutK8sPrefix + case null => + val resolvedURL = s"https://$masterWithoutK8sPrefix" + logInfo("No scheme specified for kubernetes master URL, so defaulting to https. Resolved " + + s"URL is $resolvedURL.") + resolvedURL + case _ => + throw new IllegalArgumentException("Invalid Kubernetes master scheme: " + masterScheme) + } + + s"k8s://$resolvedURL" + } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala index bae1d2d..cbb98fa 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest.backend.GCE import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} +import org.apache.spark.deploy.k8s.integrationtest.Utils import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND @@ -25,10 +26,11 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB private var defaultClient: DefaultKubernetesClient = _ override def initialize(): Unit = { - var k8ConfBuilder = new ConfigBuilder() + val k8sConf = new ConfigBuilder() .withApiVersion("v1") - .withMasterUrl(master.replaceFirst("k8s://", "")) - defaultClient = new DefaultKubernetesClient(k8ConfBuilder.build) + .withMasterUrl(Utils.checkAndGetK8sMasterUrl(master).replaceFirst("k8s://", "")) + .build() + defaultClient = new DefaultKubernetesClient(k8sConf) } override def getKubernetesClient(): DefaultKubernetesClient = { diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index b394fa4..c04bd75 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -39,6 +39,9 @@ private[spark] object Minikube extends Logging { private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + // NOTE: This and the following methods are synchronized to prevent deleteMinikube from + // destroying the minikube VM while other methods try to use the VM. + // Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox. def startMinikube(): Unit = synchronized { assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) if (getMinikubeStatus != MinikubeStatus.RUNNING) { diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index 5c43ed6..0ae0f3e 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -42,20 +42,20 @@ private[spark] class SparkDockerImageBuilder private val originalDockerUri = URI.create(dockerHost) private val httpsDockerUri = new URIBuilder() - .setHost(originalDockerUri.getHost) - .setPort(originalDockerUri.getPort) - .setScheme("https") - .build() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) private val dockerClient = new DefaultDockerClient.Builder() .uri(httpsDockerUri) - .dockerCertificates(DockerCertificates - .builder() - .dockerCertPath(Paths.get(dockerCerts)) - .build().get()) + .dockerCertificates(DockerCertificates.builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build() + .get()) .build() def buildSparkDockerImages(): Unit = { From 6a93a9edfd31d7df0fd84284ceeddfcffbd72e32 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 12:08:53 -0800 Subject: [PATCH 14/22] Clean up pom.xml --- integration-test/pom.xml | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index e35d567..99362dd 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -31,6 +31,7 @@ kubernetes-integration-tests YOUR-SPARK-DISTRO-TARBALL-HERE YOUR-DOCKERFILES-DIR-HERE + jar Spark Project Kubernetes Integration Tests @@ -85,6 +86,12 @@ 2.2.6 test + + org.slf4j + slf4j-log4j12 + 1.7.24 + test + @@ -118,7 +125,7 @@ /bin/sh -c - mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp + rm -rf spark-distro; mkdir spark-distro-tmp; cd spark-distro-tmp; tar xfz ${spark-distro-tgz}; mv * ../spark-distro; cd ..; rm -rf spark-distro-tmp @@ -139,7 +146,7 @@ - + set-exec-bit-on-docker-entrypoint-sh pre-integration-test @@ -192,6 +199,19 @@ the test phase. --> org.scalatest scalatest-maven-plugin + 1.0 + + ${project.build.directory}/surefire-reports + . + SparkTestSuite.txt + -ea -Xmx3g -XX:ReservedCodeCacheSize=512m ${extraScalaTestArgs} + + + file:src/test/resources/log4j.properties + true + + ${test.exclude.tags} + test From b3e4ee035d6b45519382beff6970f95191f7378f Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 12:24:36 -0800 Subject: [PATCH 15/22] Add instructions in README.md --- README.md | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 19b41ca..e4e7089 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,72 @@ -# spark-integration -Integration tests for Spark +--- +layout: global +title: Spark on Kubernetes Integration Tests +--- + +# Running the Kubernetes Integration Tests + +Note that the integration test framework is currently being heavily revised and +is subject to change. + +Note that currently the integration tests only run with Java 8. + +Running the integration tests requires a Spark distribution tarball. It also +needs a local path to the directory that contains `Dockerimage` files. + +Once you prepare the inputs, the integration tests can be executed with Maven or +your IDE. Note that when running tests from an IDE, the `pre-integration-test` +phase must be run every time the Spark main code changes. When running tests +from the command line, the `pre-integration-test` phase should automatically be +invoked if the `integration-test` phase is run. + +With Maven, the integration test can be run using the following command: + +``` +$ mvn clean integration-test \ + -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ + -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles +``` + +# Running against an arbitrary cluster + +In order to run against any cluster, use the following: +```sh +$ mvn clean integration-test \ + -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ + -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" +``` + +# Preserve the Minikube VM + +The integration tests make use of +[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual +machine and setup a single-node kubernetes cluster within it. By default the vm +is destroyed after the tests are finished. If you want to preserve the vm, e.g. +to reduce the running time of tests during development, you can pass the +property `spark.docker.test.persistMinikube` to the test process: + +``` +$ mvn clean integration-test \ + -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ + -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true +``` + +# Reuse the previous Docker images + +The integration tests build a number of Docker images, which takes some time. +By default, the images are built every time the tests run. You may want to skip +re-building those images during development, if the distribution package did not +change since the last run. You can pass the property +`spark.docker.test.skipBuildImages` to the test process. This will work only if +you have been setting the property `spark.docker.test.persistMinikube`, in the +previous run since the docker daemon run inside the minikube environment. Here +is an example: + +``` +$ mvn clean integration-test \ + -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ + -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + "-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true" +``` From 989a371094718156cce45b68e59a98d191af4c72 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 13:32:58 -0800 Subject: [PATCH 16/22] Switch to container.image property keys --- .../k8s/integrationtest/KubernetesTestComponents.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 13a02b5..f1852ba 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -57,11 +57,9 @@ private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesCl new SparkAppConf() .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") .set("spark.kubernetes.namespace", namespace) - // TODO: apache/spark#19995 is changing docker.image to container.image in these properties. - // Update them once the PR is merged. - .set("spark.kubernetes.driver.docker.image", + .set("spark.kubernetes.driver.container.image", System.getProperty("spark.docker.test.driverImage", "spark-driver:latest")) - .set("spark.kubernetes.executor.docker.image", + .set("spark.kubernetes.executor.container.image", System.getProperty("spark.docker.test.executorImage", "spark-executor:latest")) .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") From 46d1f5f49a8111d96dce15aaefafbbca6fe3e695 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 13:59:57 -0800 Subject: [PATCH 17/22] Define version properies in pom.xml --- integration-test/pom.xml | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 99362dd..4b191a3 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -23,11 +23,20 @@ 0.1-SNAPSHOT 3.5 + 1.1.1 + 5.0.2 + 1.3.0 + 1.4.0 + 18.0 1.3.9 - 3.0.0 + 3.0.0 1.2.17 2.11.8 2.11 + 3.2.2 + 2.2.6 + 1.0 + 1.7.24 kubernetes-integration-tests YOUR-SPARK-DISTRO-TARBALL-HERE YOUR-DOCKERFILES-DIR-HERE @@ -40,7 +49,7 @@ commons-logging commons-logging - 1.1.1 + ${commons-logging.version} com.google.code.findbugs @@ -52,18 +61,18 @@ guava test - 18.0 + ${guava.version} com.spotify docker-client - 5.0.2 + ${docker-client.version} test io.fabric8 kubernetes-client - ${kubernetes.client.version} + ${kubernetes-client.version} log4j @@ -83,13 +92,13 @@ org.scalatest scalatest_${scala.binary.version} - 2.2.6 + ${scalatest.version} test org.slf4j slf4j-log4j12 - 1.7.24 + ${slf4j-log4j12.version} test @@ -99,7 +108,7 @@ net.alchim31.maven scala-maven-plugin - 3.2.2 + ${scala-maven-plugin.version} @@ -112,7 +121,7 @@ org.codehaus.mojo exec-maven-plugin - 1.4.0 + ${exec-maven-plugin.version} unpack-spark-distro @@ -166,7 +175,7 @@ com.googlecode.maven-download-plugin download-maven-plugin - 1.3.0 + ${download-maven-plugin.version} download-minikube-linux @@ -199,7 +208,7 @@ the test phase. --> org.scalatest scalatest-maven-plugin - 1.0 + ${scalatest-maven-plugin.version} ${project.build.directory}/surefire-reports . From 5f889216976f140018bb8e89a0a4b8431a11e53f Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 16:41:14 -0800 Subject: [PATCH 18/22] Fix a bug in pom.xml --- integration-test/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 4b191a3..0440203 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -27,6 +27,7 @@ 5.0.2 1.3.0 1.4.0 + 18.0 1.3.9 3.0.0 From a68bd5f9d93f2f9b5fb1429ab606f0a686f831cd Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Mon, 18 Dec 2017 17:37:22 -0800 Subject: [PATCH 19/22] Clean up and fix README.md --- README.md | 2 +- integration-test/test-data/input.txt | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) delete mode 100644 integration-test/test-data/input.txt diff --git a/README.md b/README.md index e4e7089..6a366e6 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ is subject to change. Note that currently the integration tests only run with Java 8. Running the integration tests requires a Spark distribution tarball. It also -needs a local path to the directory that contains `Dockerimage` files. +needs a local path to the directory that contains `Dockerfile`s. Once you prepare the inputs, the integration tests can be executed with Maven or your IDE. Note that when running tests from an IDE, the `pre-integration-test` diff --git a/integration-test/test-data/input.txt b/integration-test/test-data/input.txt deleted file mode 100644 index dfe437b..0000000 --- a/integration-test/test-data/input.txt +++ /dev/null @@ -1 +0,0 @@ -Contents From 0f8fad9c7a08f1d27692e4beda41df50b973fd31 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 19 Dec 2017 11:03:19 -0800 Subject: [PATCH 20/22] Fix README.md --- README.md | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 6a366e6..c2f507d 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,26 @@ is subject to change. Note that currently the integration tests only run with Java 8. -Running the integration tests requires a Spark distribution tarball. It also -needs a local path to the directory that contains `Dockerfile`s. +Running the integration tests requires a Spark distribution package tarball that +contains Spark jars, submission clients, etc. You can download a tarball from +http://spark.apache.org/downloads.html. Or, you can create a distribution from +source code using `make-distribution.sh`. For example: + +``` +$ git clone git@github.com:apache/spark.git +$ cd spark +$ ./dev/make-distribution.sh --tgz \ + -Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver +``` + +The above command will create a tarball like spark-2.3.0-SNAPSHOT-bin.tgz in the +top-level dir. For more details, see the related section in +[building-spark.md](https://github.com/apache/spark/blob/master/docs/building-spark.md#building-a-runnable-distribution) + + +The integration tests also need a local path to the directory that +contains `Dockerfile`s. In the main spark repo, the path is +`/spark/resource-managers/kubernetes/docker/src/main/dockerfiles`. Once you prepare the inputs, the integration tests can be executed with Maven or your IDE. Note that when running tests from an IDE, the `pre-integration-test` @@ -23,8 +41,8 @@ With Maven, the integration test can be run using the following command: ``` $ mvn clean integration-test \ - -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ - -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles ``` # Running against an arbitrary cluster @@ -32,8 +50,8 @@ $ mvn clean integration-test \ In order to run against any cluster, use the following: ```sh $ mvn clean integration-test \ - -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ - -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" ``` @@ -48,8 +66,8 @@ property `spark.docker.test.persistMinikube` to the test process: ``` $ mvn clean integration-test \ - -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ - -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles -DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true ``` @@ -66,7 +84,7 @@ is an example: ``` $ mvn clean integration-test \ - -Dspark-distro-tgz=/tmp/spark-2.3.0-SNAPSHOT-bin-20171216-0c8fca4608.tgz \ - -Dspark-dockerfiles-dir=.../spark/resource-managers/kubernetes/docker/src/main/dockerfiles + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ + -Dspark-dockerfiles-dir=spark/resource-managers/kubernetes/docker/src/main/dockerfiles "-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true" ``` From 7bc4aa28e00ae5fd93bd83d5ed4e9a1f36423261 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 19 Dec 2017 11:11:51 -0800 Subject: [PATCH 21/22] Remove unnecessary close --- .../spark/deploy/k8s/integrationtest/ProcessUtils.scala | 9 +++------ .../apache/spark/deploy/k8s/integrationtest/Utils.scala | 5 ----- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index ede6e64..7dcde04 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -32,14 +32,11 @@ object ProcessUtils extends Logging { val proc = pb.start() val outputLines = new ArrayBuffer[String] - Utils.tryWithResource(Source.fromInputStream(proc.getInputStream, "UTF-8")) { output => - for (line <- output.getLines) { + Utils.tryWithResource(proc.getInputStream)( + Source.fromInputStream(_, "UTF-8").getLines().foreach { line => logInfo(line) outputLines += line - } - }{ - output => output.close() - } + }) assert(proc.waitFor(timeout, TimeUnit.SECONDS), s"Timed out while executing ${fullCommand.mkString(" ")}") assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index bf4562d..911b3a9 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -26,11 +26,6 @@ object Utils extends Logging { try f.apply(resource) finally resource.close() } - def tryWithResource[R, T](createResource: => R)(f: R => T)(closeResource: R => T): T = { - val resource = createResource - try f.apply(resource) finally closeResource(resource) - } - def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { require(rawMasterURL.startsWith("k8s://"), "Kubernetes master URL must start with k8s://.") From 9ec071a2dc2005f5d4cebd2457674b29254c2185 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Tue, 19 Dec 2017 11:12:49 -0800 Subject: [PATCH 22/22] Clean up --- .../apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala index 7dcde04..e9f143c 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -31,7 +31,6 @@ object ProcessUtils extends Logging { pb.redirectErrorStream(true) val proc = pb.start() val outputLines = new ArrayBuffer[String] - Utils.tryWithResource(proc.getInputStream)( Source.fromInputStream(_, "UTF-8").getLines().foreach { line => logInfo(line)