From eb4bac8de1e2c68abf52f6915328a611dee994aa Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 1 Feb 2017 14:16:27 -0800 Subject: [PATCH 1/2] Allow adding arbitrary files --- .../org/apache/spark/deploy/SparkSubmit.scala | 2 + .../spark/deploy/SparkSubmitArguments.scala | 7 ++ .../launcher/SparkSubmitOptionParser.java | 4 +- .../spark/deploy/kubernetes/Client.scala | 34 ++++++-- .../spark/deploy/kubernetes/config.scala | 12 ++- .../rest/KubernetesRestProtocolMessages.scala | 3 +- .../rest/kubernetes/CompressionUtils.scala | 8 +- .../KubernetesSparkRestServer.scala | 53 ++++++++---- .../jobs/FileExistenceTest.scala | 54 ++++++++++++ .../integrationtest/KubernetesSuite.scala | 85 +++++++++++++++++-- .../integration-tests/test-data/input.txt | 1 + 11 files changed, 231 insertions(+), 32 deletions(-) create mode 100644 resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala create mode 100644 resource-managers/kubernetes/integration-tests/test-data/input.txt diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fcbd5ef7cc67..5874511820a3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -464,6 +464,8 @@ object SparkSubmit extends CommandLineUtils { sysProp = "spark.kubernetes.namespace"), OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER, sysProp = "spark.kubernetes.driver.uploads.jars"), + OptionAssigner(args.kubernetesUploadFiles, KUBERNETES, CLUSTER, + sysProp = "spark.kubernetes.driver.uploads.files"), // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 9bce824a43e1..9eb1a16df809 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -74,6 +74,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S // Kubernetes only var kubernetesNamespace: String = null var kubernetesUploadJars: String = null + var kubernetesUploadFiles: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -196,6 +197,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S kubernetesUploadJars = Option(kubernetesUploadJars) .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars")) .orNull + kubernetesUploadFiles = Option(kubernetesUploadFiles) + .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.files")) + .orNull // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && !isR && primaryResource != null) { @@ -440,6 +444,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case KUBERNETES_UPLOAD_JARS => kubernetesUploadJars = value + case KUBERNETES_UPLOAD_FILES => + kubernetesUploadFiles = value + case HELP => printUsageAndExit(0) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index f1dac20f52f0..3369b5d8301b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -80,6 +80,7 @@ class SparkSubmitOptionParser { protected final String KUBERNETES_MASTER = "--kubernetes-master"; protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace"; protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars"; + protected final String KUBERNETES_UPLOAD_FILES = "--upload-files"; /** * This is the canonical list of spark-submit options. Each entry in the array contains the @@ -122,7 +123,8 @@ class SparkSubmitOptionParser { { TOTAL_EXECUTOR_CORES }, { KUBERNETES_MASTER }, { KUBERNETES_NAMESPACE }, - { KUBERNETES_UPLOAD_JARS } + { KUBERNETES_UPLOAD_JARS }, + { KUBERNETES_UPLOAD_FILES } }; /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index fed9334dbbab..2b733c3dea63 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -61,7 +61,9 @@ private[spark] class Client( private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) - private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS) + private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty) + private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty) + validateNoDuplicateUploadFileNames() private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -511,18 +513,40 @@ private[spark] class Client( case "container" => ContainerAppResource(appResourceUri.getPath) case other => RemoteAppResource(other) } - - val uploadJarsBase64Contents = compressJars(uploadedJars) + val uploadJarsBase64Contents = compressFiles(uploadedJars) + val uploadFilesBase64Contents = compressFiles(uploadedFiles) KubernetesCreateSubmissionRequest( appResource = resolvedAppResource, mainClass = mainClass, appArgs = appArgs, secret = secretBase64String, sparkProperties = sparkConf.getAll.toMap, - uploadedJarsBase64Contents = uploadJarsBase64Contents) + uploadedJarsBase64Contents = uploadJarsBase64Contents, + uploadedFilesBase64Contents = uploadFilesBase64Contents) + } + + // Because uploaded files should be added to the working directory of the driver, they + // need to not have duplicate file names. They are added to the working directory so the + // user can reliably locate them in their application. + private def validateNoDuplicateUploadFileNames(): Unit = { + uploadedFiles.foreach { unsplitPaths => + val splitPaths = unsplitPaths.split(",") + val allPathsByFileName = splitPaths.groupBy(new File(_).getName) + val pathsWithDuplicateNames = allPathsByFileName.filter(_._2.length > 1) + if (pathsWithDuplicateNames.nonEmpty) { + val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames + .values + .flatten + .toList + .sortBy(new File(_).getName) + throw new SparkException("Cannot upload files with duplicate names via" + + s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" + + s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}") + } + } } - private def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = { + private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = { maybeFilePaths .map(_.split(",")) .map(CompressionUtils.createTarGzip(_)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 9b145370f87d..bb3d5012c2a4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -94,7 +94,17 @@ package object config { private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS = ConfigBuilder("spark.kubernetes.driver.uploads.jars") .doc(""" - | Comma-separated list of jars to sent to the driver and + | Comma-separated list of jars to send to the driver and + | all executors when submitting the application in cluster + | mode. + """.stripMargin) + .stringConf + .createOptional + + private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES = + ConfigBuilder("spark.kubernetes.driver.uploads.files") + .doc(""" + | Comma-separated list of files to send to the driver and | all executors when submitting the application in cluster | mode. """.stripMargin) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 8beba23bc8e1..6aeb851a16bf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -26,7 +26,8 @@ case class KubernetesCreateSubmissionRequest( appArgs: Array[String], sparkProperties: Map[String, String], secret: String, - uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest { + uploadedJarsBase64Contents: Option[TarGzippedData], + uploadedFilesBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala index 1c95dacc7eb0..8fbab21369e3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala @@ -26,6 +26,7 @@ import org.apache.commons.compress.utils.CharsetNames import org.apache.commons.io.IOUtils import scala.collection.mutable +import org.apache.spark.SparkException import org.apache.spark.deploy.rest.TarGzippedData import org.apache.spark.internal.Logging import org.apache.spark.util.{ByteBufferOutputStream, Utils} @@ -46,7 +47,8 @@ private[spark] object CompressionUtils extends Logging { * @param paths A list of file paths to be archived * @return An in-memory representation of the compressed data. */ - def createTarGzip(paths: Iterable[String]): TarGzippedData = { + def createTarGzip(paths: Iterable[String]): + TarGzippedData = { val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw => Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping => Utils.tryWithResource(new TarArchiveOutputStream( @@ -68,8 +70,8 @@ private[spark] object CompressionUtils extends Logging { while (usedFileNames.contains(resolvedFileName)) { val oldResolvedFileName = resolvedFileName resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension" - logWarning(s"File with name $oldResolvedFileName already exists. Trying to add with" + - s" file name $resolvedFileName instead.") + logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" + + s" with file name $resolvedFileName instead.") deduplicationCounter += 1 } usedFileNames += resolvedFileName diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 451dc96dd65e..ab11e5200b50 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.rest.kubernetes import java.io.File import java.net.URI +import java.nio.file.Paths import java.util.concurrent.CountDownLatch import javax.servlet.http.{HttpServletRequest, HttpServletResponse} @@ -27,7 +28,7 @@ import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} +import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} @@ -149,7 +150,8 @@ private[spark] class KubernetesSparkRestServer( appArgs, sparkProperties, secret, - uploadedJars) => + uploadedJars, + uploadedFiles) => val decodedSecret = Base64.decodeBase64(secret) if (!expectedApplicationSecret.sameElements(decodedSecret)) { responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED) @@ -157,29 +159,33 @@ private[spark] class KubernetesSparkRestServer( } else { val tempDir = Utils.createTempDir() val appResourcePath = resolvedAppResource(appResource, tempDir) - val jarsDirectory = new File(tempDir, "jars") - if (!jarsDirectory.mkdir) { - throw new IllegalStateException("Failed to create jars dir at" + - s"${jarsDirectory.getAbsolutePath}") - } - val writtenJars = writeBase64ContentsToFiles(uploadedJars, jarsDirectory) - val driverExtraClasspath = sparkProperties - .get("spark.driver.extraClassPath") - .map(_.split(",")) - .getOrElse(Array.empty[String]) + val writtenJars = writeUploadedJars(uploadedJars, tempDir) + val writtenFiles = writeUploadedFiles(uploadedFiles) + val resolvedSparkProperties = new mutable.HashMap[String, String] + resolvedSparkProperties ++= sparkProperties + + // Resolve driver classpath and jars val originalJars = sparkProperties.get("spark.jars") .map(_.split(",")) .getOrElse(Array.empty[String]) val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath) val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) + val driverExtraClasspath = sparkProperties + .get("spark.driver.extraClassPath") + .map(_.split(",")) + .getOrElse(Array.empty[String]) val driverClasspath = driverExtraClasspath ++ resolvedJars ++ - sparkJars ++ - Array(appResourcePath) - val resolvedSparkProperties = new mutable.HashMap[String, String] - resolvedSparkProperties ++= sparkProperties + sparkJars resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") + // Resolve spark.files + val originalFiles = sparkProperties.get("spark.files") + .map(_.split(",")) + .getOrElse(Array.empty[String]) + val resolvedFiles = originalFiles ++ writtenFiles + resolvedSparkProperties("spark.files") = resolvedFiles.mkString + val command = new ArrayBuffer[String] command += javaExecutable command += "-cp" @@ -229,6 +235,21 @@ private[spark] class KubernetesSparkRestServer( } } + private def writeUploadedJars(files: Option[TarGzippedData], rootTempDir: File): + Seq[String] = { + val resolvedDirectory = new File(rootTempDir, "jars") + if (!resolvedDirectory.mkdir()) { + throw new IllegalStateException(s"Failed to create jars dir at " + + resolvedDirectory.getAbsolutePath) + } + writeBase64ContentsToFiles(files, resolvedDirectory) + } + + private def writeUploadedFiles(files: Option[TarGzippedData]): Seq[String] = { + val workingDir = Paths.get("").toFile.getAbsoluteFile + writeBase64ContentsToFiles(files, workingDir) + } + def resolvedAppResource(appResource: AppResource, tempDir: File): String = { val appResourcePath = appResource match { case UploadedAppResource(resourceContentsBase64, resourceName) => diff --git a/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala new file mode 100644 index 000000000000..8b8d5e05f647 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests-spark-jobs/src/main/scala/org/apache/spark/deploy/kubernetes/integrationtest/jobs/FileExistenceTest.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.integrationtest.jobs + +import java.nio.file.Paths + +import com.google.common.base.Charsets +import com.google.common.io.Files + +import org.apache.spark.SparkException +import org.apache.spark.sql.SparkSession + +private[spark] object FileExistenceTest { + + def main(args: Array[String]): Unit = { + if (args.length < 2) { + throw new IllegalArgumentException("Usage: WordCount ") + } + // Can't use SparkContext.textFile since the file is local to the driver + val file = Paths.get(args(0)).toFile + if (!file.exists()) { + throw new SparkException(s"Failed to find file at ${file.getAbsolutePath}") + } else { + // scalastyle:off println + val contents = Files.toString(file, Charsets.UTF_8) + if (args(1) != contents) { + throw new SparkException(s"Contents do not match. Expected: ${args(1)}," + + s" actual, $contents") + } else { + println(s"File found at ${file.getAbsolutePath} with correct contents.") + } + // scalastyle:on println + } + val spark = SparkSession.builder() + .appName("Test") + .getOrCreate() + spark.stop() + } + +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 16de71118dec..64f046fb3c26 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -21,7 +21,9 @@ import java.nio.file.Paths import java.util.UUID import java.util.concurrent.TimeUnit +import com.google.common.base.Charsets import com.google.common.collect.ImmutableList +import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{Config, KubernetesClient, KubernetesClientException, Watcher} @@ -62,10 +64,14 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .getOrElse(throw new IllegalStateException("Expected to find spark-examples jar; was the" + " pre-integration-test phase run?")) + private val TEST_EXISTENCE_FILE = Paths.get("test-data", "input.txt").toFile + private val TEST_EXISTENCE_FILE_CONTENTS = Files.toString(TEST_EXISTENCE_FILE, Charsets.UTF_8) private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) - private val MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + + private val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.SparkPiWithInfiniteWait" + private val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + + ".integrationtest.jobs.FileExistenceTest" private val NAMESPACE = UUID.randomUUID().toString.replaceAll("-", "") private var minikubeKubernetesClient: KubernetesClient = _ private var clientConfig: Config = _ @@ -179,7 +185,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { new Client( sparkConf = sparkConf, - mainClass = MAIN_CLASS, + mainClass = SPARK_PI_MAIN_CLASS, mainAppResource = mainAppResource, appArgs = Array.empty[String]).run() val sparkMetricsService = getSparkMetricsService("spark-pi") @@ -196,7 +202,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-cores", "1", "--num-executors", "1", "--upload-jars", HELPER_JAR, - "--class", MAIN_CLASS, + "--class", SPARK_PI_MAIN_CLASS, "--conf", "spark.ui.enabled=true", "--conf", "spark.testing=false", "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", @@ -279,7 +285,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-cores", "1", "--num-executors", "1", "--upload-jars", HELPER_JAR, - "--class", MAIN_CLASS, + "--class", SPARK_PI_MAIN_CLASS, "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", @@ -317,7 +323,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-cores", "1", "--num-executors", "1", "--upload-jars", HELPER_JAR, - "--class", MAIN_CLASS, + "--class", SPARK_PI_MAIN_CLASS, "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", @@ -334,4 +340,73 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { EXAMPLES_JAR) SparkSubmit.main(args) } + + test("Added files should exist on the driver.") { + val args = Array( + "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", + "--deploy-mode", "cluster", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-file-existence-test", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--upload-jars", HELPER_JAR, + "--upload-files", TEST_EXISTENCE_FILE.getAbsolutePath, + "--class", FILE_EXISTENCE_MAIN_CLASS, + "--conf", "spark.ui.enabled=true", + "--conf", "spark.testing=false", + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + EXAMPLES_JAR, + TEST_EXISTENCE_FILE.getName, + TEST_EXISTENCE_FILE_CONTENTS) + val podCompletedFuture = SettableFuture.create[Boolean] + val watch = new Watcher[Pod] { + override def eventReceived(action: Action, pod: Pod): Unit = { + val containerStatuses = pod.getStatus.getContainerStatuses.asScala + val allSuccessful = containerStatuses.nonEmpty && containerStatuses + .forall(status => { + status.getState.getTerminated != null && status.getState.getTerminated.getExitCode == 0 + }) + if (allSuccessful) { + podCompletedFuture.set(true) + } else { + val failedContainers = containerStatuses.filter(container => { + container.getState.getTerminated != null && + container.getState.getTerminated.getExitCode != 0 + }) + if (failedContainers.nonEmpty) { + podCompletedFuture.setException(new SparkException( + "One or more containers in the driver failed with a nonzero exit code.")) + } + } + } + + override def onClose(e: KubernetesClientException): Unit = { + logWarning("Watch closed", e) + } + } + Utils.tryWithResource(minikubeKubernetesClient + .pods + .withLabel("spark-app-name", "spark-file-existence-test") + .watch(watch)) { _ => + SparkSubmit.main(args) + assert(podCompletedFuture.get, "Failed to run driver pod") + val driverPod = minikubeKubernetesClient + .pods + .withLabel("spark-app-name", "spark-file-existence-test") + .list() + .getItems + .get(0) + val podLog = minikubeKubernetesClient + .pods + .withName(driverPod.getMetadata.getName) + .getLog + assert(podLog.contains(s"File found at /opt/spark/${TEST_EXISTENCE_FILE.getName}" + + s" with correct contents."), "Job did not find the file as expected.") + } + } } diff --git a/resource-managers/kubernetes/integration-tests/test-data/input.txt b/resource-managers/kubernetes/integration-tests/test-data/input.txt new file mode 100644 index 000000000000..dfe437bdebeb --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/test-data/input.txt @@ -0,0 +1 @@ +Contents From f094a74bbd4f75627eff1102eee1fdf518665998 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 1 Feb 2017 17:04:15 -0800 Subject: [PATCH 2/2] Address comments and add documentation --- docs/running-on-kubernetes.md | 12 ++++++- .../spark/deploy/kubernetes/Client.scala | 34 +++++++++---------- .../spark/deploy/kubernetes/config.scala | 6 +++- .../rest/kubernetes/CompressionUtils.scala | 4 +-- .../KubernetesSparkRestServer.scala | 2 +- .../integrationtest/KubernetesSuite.scala | 4 +-- 6 files changed, 37 insertions(+), 25 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index e256535fbbc9..5a48bb254a6d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -217,10 +217,20 @@ from the other deployment modes. See the [configuration page](configuration.html spark.kubernetes.driver.uploads.jars (none) - Comma-separated list of jars to sent to the driver and all executors when submitting the application in cluster + Comma-separated list of jars to send to the driver and all executors when submitting the application in cluster mode. Refer to adding other jars for more information. + + spark.kubernetes.driver.uploads.files + (none) + + Comma-separated list of files to send to the driver and all executors when submitting the application in cluster + mode. The files are added in a flat hierarchy to the current working directory of the driver, having the same + names as the names of the original files. Note that two files with the same name cannot be added, even if they + were in different source directories on the client disk. + + spark.kubernetes.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 2b733c3dea63..0f9b19b983de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -63,7 +63,7 @@ private[spark] class Client( private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty) private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty) - validateNoDuplicateUploadFileNames() + uploadedFiles.foreach(validateNoDuplicateUploadFileNames) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -527,22 +527,22 @@ private[spark] class Client( // Because uploaded files should be added to the working directory of the driver, they // need to not have duplicate file names. They are added to the working directory so the - // user can reliably locate them in their application. - private def validateNoDuplicateUploadFileNames(): Unit = { - uploadedFiles.foreach { unsplitPaths => - val splitPaths = unsplitPaths.split(",") - val allPathsByFileName = splitPaths.groupBy(new File(_).getName) - val pathsWithDuplicateNames = allPathsByFileName.filter(_._2.length > 1) - if (pathsWithDuplicateNames.nonEmpty) { - val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames - .values - .flatten - .toList - .sortBy(new File(_).getName) - throw new SparkException("Cannot upload files with duplicate names via" + - s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" + - s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}") - } + // user can reliably locate them in their application. This is similar in principle to how + // YARN handles its `spark.files` setting. + private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = { + val pathsWithDuplicateNames = uploadedFilesCommaSeparated + .split(",") + .groupBy(new File(_).getName) + .filter(_._2.length > 1) + if (pathsWithDuplicateNames.nonEmpty) { + val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames + .values + .flatten + .toList + .sortBy(new File(_).getName) + throw new SparkException("Cannot upload files with duplicate names via" + + s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" + + s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}") } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index bb3d5012c2a4..3e0c400febca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -106,7 +106,11 @@ package object config { .doc(""" | Comma-separated list of files to send to the driver and | all executors when submitting the application in cluster - | mode. + | mode. The files are added in a flat hierarchy to the + | current working directory of the driver, having the same + | names as the names of the original files. Note that two + | files with the same name cannot be added, even if they + | were in different source directories on the client disk. """.stripMargin) .stringConf .createOptional diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala index 8fbab21369e3..7204cb874aae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/CompressionUtils.scala @@ -26,7 +26,6 @@ import org.apache.commons.compress.utils.CharsetNames import org.apache.commons.io.IOUtils import scala.collection.mutable -import org.apache.spark.SparkException import org.apache.spark.deploy.rest.TarGzippedData import org.apache.spark.internal.Logging import org.apache.spark.util.{ByteBufferOutputStream, Utils} @@ -47,8 +46,7 @@ private[spark] object CompressionUtils extends Logging { * @param paths A list of file paths to be archived * @return An in-memory representation of the compressed data. */ - def createTarGzip(paths: Iterable[String]): - TarGzippedData = { + def createTarGzip(paths: Iterable[String]): TarGzippedData = { val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw => Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping => Utils.tryWithResource(new TarArchiveOutputStream( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index ab11e5200b50..c5a7e27b1592 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -184,7 +184,7 @@ private[spark] class KubernetesSparkRestServer( .map(_.split(",")) .getOrElse(Array.empty[String]) val resolvedFiles = originalFiles ++ writtenFiles - resolvedSparkProperties("spark.files") = resolvedFiles.mkString + resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",") val command = new ArrayBuffer[String] command += javaExecutable diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 64f046fb3c26..40867c40d447 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -353,8 +353,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--upload-jars", HELPER_JAR, "--upload-files", TEST_EXISTENCE_FILE.getAbsolutePath, "--class", FILE_EXISTENCE_MAIN_CLASS, - "--conf", "spark.ui.enabled=true", - "--conf", "spark.testing=false", + "--conf", "spark.ui.enabled=false", + "--conf", "spark.testing=true", "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}",