From 94111940b7f7cb7f1575682aa589e5f56d465bf0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 10 Feb 2017 18:01:43 -0800 Subject: [PATCH 1/3] Change the API contract for uploading local jars. This mirrors similarly to what YARN and Mesos expects. --- .../org/apache/spark/deploy/SparkSubmit.scala | 9 +- .../spark/deploy/SparkSubmitArguments.scala | 14 --- docs/running-on-kubernetes.md | 102 +--------------- .../launcher/SparkSubmitOptionParser.java | 8 +- .../spark/deploy/kubernetes/Client.scala | 115 ++++++------------ .../spark/deploy/kubernetes/config.scala | 24 ---- .../rest/KubernetesRestProtocolMessages.scala | 4 +- .../rest/kubernetes/KubernetesFileUtils.scala | 44 +++++++ .../KubernetesSparkRestServer.scala | 97 +++++++++------ .../integrationtest/KubernetesSuite.scala | 14 +-- 10 files changed, 163 insertions(+), 268 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5874511820a34..c62b97333cba5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -462,10 +462,6 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.kubernetes.namespace"), - OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER, - sysProp = "spark.kubernetes.driver.uploads.jars"), - OptionAssigner(args.kubernetesUploadFiles, KUBERNETES, CLUSTER, - sysProp = "spark.kubernetes.driver.uploads.files"), // Other options OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES, @@ -474,10 +470,11 @@ object SparkSubmit extends CommandLineUtils { sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), - OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, sysProp = "spark.files"), OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"), - OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"), + OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, + sysProp = "spark.jars"), OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER, sysProp = "spark.driver.memory"), OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 9eb1a16df809b..90903325d7759 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -73,8 +73,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S // Kubernetes only var kubernetesNamespace: String = null - var kubernetesUploadJars: String = null - var kubernetesUploadFiles: String = null // Standalone cluster mode only var supervise: Boolean = false @@ -194,12 +192,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S kubernetesNamespace = Option(kubernetesNamespace) .orElse(sparkProperties.get("spark.kubernetes.namespace")) .orNull - kubernetesUploadJars = Option(kubernetesUploadJars) - .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars")) - .orNull - kubernetesUploadFiles = Option(kubernetesUploadFiles) - .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.files")) - .orNull // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && !isR && primaryResource != null) { @@ -441,12 +433,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S case KUBERNETES_NAMESPACE => kubernetesNamespace = value - case KUBERNETES_UPLOAD_JARS => - kubernetesUploadJars = value - - case KUBERNETES_UPLOAD_FILES => - kubernetesUploadFiles = value - case HELP => printUsageAndExit(0) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5a48bb254a6df..f4712173d87a1 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -52,86 +52,14 @@ connect without SSL on a different port, the master would be set to `k8s://http: Note that applications can currently only be executed in cluster mode, where the driver and its executors are running on the cluster. -### Adding Other JARs +### Dependency Management and Docker Containers -Spark allows users to provide dependencies that are bundled into the driver's Docker image, or that are on the local -disk of the submitter's machine. These two types of dependencies are specified via different configuration options to -`spark-submit`: +Spark supports specifying JAR paths that are either on the submitting host's disk, or are located on the disk of the +driver and executors. Refer to the [application submission](submitting-applications.html#advanced-dependency-management) +section for details. Note that files specified with the `local` scheme should be added to the container image of both +the driver and the executors. Files without a scheme or with the scheme `file://` are treated as being on the disk of +the submitting machine, and are uploaded to the driver running in Kubernetes before launching the application. -* Local jars provided by specifying the `--jars` command line argument to `spark-submit`, or by setting `spark.jars` in - the application's configuration, will be treated as jars that are located on the *disk of the driver container*. This - only applies to jar paths that do not specify a scheme or that have the scheme `file://`. Paths with other schemes are - fetched from their appropriate locations. -* Local jars provided by specifying the `--upload-jars` command line argument to `spark-submit`, or by setting - `spark.kubernetes.driver.uploads.jars` in the application's configuration, will be treated as jars that are located on - the *disk of the submitting machine*. These jars are uploaded to the driver docker container before executing the - application. -* A main application resource path that does not have a scheme or that has the scheme `file://` is assumed to be on the - *disk of the submitting machine*. This resource is uploaded to the driver docker container before executing the - application. A remote path can still be specified and the resource will be fetched from the appropriate location. -* A main application resource path that has the scheme `container://` is assumed to be on the *disk of the driver - container*. - -In all of these cases, the jars are placed on the driver's classpath, and are also sent to the executors. Below are some -examples of providing application dependencies. - -To submit an application with both the main resource and two other jars living on the submitting user's machine: - - bin/spark-submit \ - --deploy-mode cluster \ - --class com.example.applications.SampleApplication \ - --master k8s://192.168.99.100 \ - --upload-jars /home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ - /home/exampleuser/exampleapplication/main.jar - -Note that since passing the jars through the `--upload-jars` command line argument is equivalent to setting the -`spark.kubernetes.driver.uploads.jars` Spark property, the above will behave identically to this command: - - bin/spark-submit \ - --deploy-mode cluster \ - --class com.example.applications.SampleApplication \ - --master k8s://192.168.99.100 \ - --conf spark.kubernetes.driver.uploads.jars=/home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ - /home/exampleuser/exampleapplication/main.jar - -To specify a main application resource that can be downloaded from an HTTP service, and if a plugin for that application -is located in the jar `/opt/spark-plugins/app-plugin.jar` on the docker image's disk: - - bin/spark-submit \ - --deploy-mode cluster \ - --class com.example.applications.PluggableApplication \ - --master k8s://192.168.99.100 \ - --jars /opt/spark-plugins/app-plugin.jar \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ - http://example.com:8080/applications/sparkpluggable/app.jar - -Note that since passing the jars through the `--jars` command line argument is equivalent to setting the `spark.jars` -Spark property, the above will behave identically to this command: - - bin/spark-submit \ - --deploy-mode cluster \ - --class com.example.applications.PluggableApplication \ - --master k8s://192.168.99.100 \ - --conf spark.jars=file:///opt/spark-plugins/app-plugin.jar \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ - http://example.com:8080/applications/sparkpluggable/app.jar - -To specify a main application resource that is in the Docker image, and if it has no other dependencies: - - bin/spark-submit \ - --deploy-mode cluster \ - --class com.example.applications.PluggableApplication \ - --master k8s://192.168.99.100:8443 \ - --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \ - --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \ - container:///home/applications/examples/example.jar - ### Setting Up SSL For Submitting the Driver When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server @@ -213,24 +141,6 @@ from the other deployment modes. See the [configuration page](configuration.html executor pods from the API server. - - spark.kubernetes.driver.uploads.jars - (none) - - Comma-separated list of jars to send to the driver and all executors when submitting the application in cluster - mode. Refer to adding other jars for more information. - - - - spark.kubernetes.driver.uploads.files - (none) - - Comma-separated list of files to send to the driver and all executors when submitting the application in cluster - mode. The files are added in a flat hierarchy to the current working directory of the driver, having the same - names as the names of the original files. Note that two files with the same name cannot be added, even if they - were in different source directories on the client disk. - - spark.kubernetes.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 3369b5d8301be..a4d43c0795abc 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -77,10 +77,7 @@ class SparkSubmitOptionParser { protected final String QUEUE = "--queue"; // Kubernetes-only options. - protected final String KUBERNETES_MASTER = "--kubernetes-master"; protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace"; - protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars"; - protected final String KUBERNETES_UPLOAD_FILES = "--upload-files"; /** * This is the canonical list of spark-submit options. Each entry in the array contains the @@ -121,10 +118,7 @@ class SparkSubmitOptionParser { { REPOSITORIES }, { STATUS }, { TOTAL_EXECUTOR_CORES }, - { KUBERNETES_MASTER }, - { KUBERNETES_NAMESPACE }, - { KUBERNETES_UPLOAD_JARS }, - { KUBERNETES_UPLOAD_FILES } + { KUBERNETES_NAMESPACE } }; /** diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 9eed9bfd2cd79..0ba2aafcd057b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -35,7 +35,7 @@ import scala.collection.mutable import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} +import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -59,11 +59,10 @@ private[spark] class Client( private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) - private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty) - private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty) - uploadedFiles.foreach(validateNoDuplicateUploadFileNames) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) + private val sparkFiles = sparkConf.getOption("spark.files") + private val sparkJars = sparkConf.getOption("spark.jars") private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION) @@ -78,9 +77,18 @@ private[spark] class Client( def run(): Unit = { logInfo(s"Starting application $kubernetesAppId in Kubernetes...") - - Seq(uploadedFiles, uploadedJars, Some(mainAppResource)).foreach(checkForFilesExistence) - + val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles) + val submitterLocalJars = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars) + (submitterLocalFiles ++ submitterLocalJars).foreach { file => + if (!new File(Utils.resolveURI(file).getPath).isFile) { + throw new SparkException(s"File $file is not a file or is a directory.") + } + } + if (KubernetesFileUtils.isUriLocalFile(mainAppResource) && + !new File(Utils.resolveURI(mainAppResource).getPath).isFile) { + throw new SparkException(s"Main app resource file $mainAppResource is not a file or" + + s" is a directory.") + } val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new K8SConfigBuilder() @@ -145,7 +153,7 @@ private[spark] class Client( } try { submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions, - ownerReferenceConfiguredDriverService) + ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars) // wait if configured to do so if (waitForAppCompletion) { logInfo(s"Waiting for application $kubernetesAppId to finish...") @@ -193,7 +201,9 @@ private[spark] class Client( private def submitApplicationToDriverServer( kubernetesClient: KubernetesClient, driverSubmitSslOptions: SSLOptions, - driverService: Service) = { + driverService: Service, + submitterLocalFiles: Iterable[String], + submitterLocalJars: Iterable[String]): Unit = { sparkConf.getOption("spark.app.id").foreach { id => logWarning(s"Warning: Provided app id in spark.app.id as $id will be" + s" overridden as $kubernetesAppId") @@ -211,7 +221,7 @@ private[spark] class Client( driverSubmitter.ping() logInfo(s"Submitting local resources to driver pod for application " + s"$kubernetesAppId ...") - val submitRequest = buildSubmissionRequest() + val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars) driverSubmitter.submitApplication(submitRequest) logInfo("Successfully submitted local resources and driver configuration to" + " driver pod.") @@ -502,25 +512,18 @@ private[spark] class Client( val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE) val resolvedSparkConf = sparkConf.clone() val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => { - val keyStoreURI = Utils.resolveURI(keyStore) - val isProvidedKeyStoreLocal = keyStoreURI.getScheme match { - case "file" | null => true - case "container" => false - case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" + - " for submit server must have scheme file:// or container:// (no scheme defaults" + - " to file://)") - } - (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath)) + (KubernetesFileUtils.isUriLocalFile(keyStore), + Option.apply(Utils.resolveURI(keyStore).getPath)) }).getOrElse((true, Option.empty[String])) resolvedKeyStore.foreach { resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _) } sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore => - val trustStoreURI = Utils.resolveURI(trustStore) - trustStoreURI.getScheme match { - case "file" | null => - resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath) - case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + + if (KubernetesFileUtils.isUriLocalFile(trustStore)) { + resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, + Utils.resolveURI(trustStore).getPath) + } else { + throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" + " for submit server must have no scheme, or scheme file://") } } @@ -673,11 +676,14 @@ private[spark] class Client( .build()) } - private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { - val appResourceUri = Utils.resolveURI(mainAppResource) - val resolvedAppResource: AppResource = appResourceUri.getScheme match { - case "file" | null => - val appFile = new File(appResourceUri.getPath) + private def buildSubmissionRequest( + submitterLocalFiles: Iterable[String], + submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = { + val mainResourceUri = Utils.resolveURI(mainAppResource) + val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme) + .getOrElse("file") match { + case "file" => + val appFile = new File(mainResourceUri.getPath) if (!appFile.isFile) { throw new IllegalStateException("Provided local file path does not exist" + s" or is not a file: ${appFile.getAbsolutePath}") @@ -685,11 +691,13 @@ private[spark] class Client( val fileBytes = Files.toByteArray(appFile) val fileBase64 = Base64.encodeBase64String(fileBytes) UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) - case "container" => ContainerAppResource(appResourceUri.getPath) + case "local" => ContainerAppResource(mainAppResource) case other => RemoteAppResource(other) } - val uploadJarsBase64Contents = compressFiles(uploadedJars) - val uploadFilesBase64Contents = compressFiles(uploadedFiles) + val uploadFilesBase64Contents = CompressionUtils.createTarGzip(submitterLocalFiles.map( + Utils.resolveURI(_).getPath)) + val uploadJarsBase64Contents = CompressionUtils.createTarGzip(submitterLocalJars.map( + Utils.resolveURI(_).getPath)) KubernetesCreateSubmissionRequest( appResource = resolvedAppResource, mainClass = mainClass, @@ -700,33 +708,6 @@ private[spark] class Client( uploadedFilesBase64Contents = uploadFilesBase64Contents) } - // Because uploaded files should be added to the working directory of the driver, they - // need to not have duplicate file names. They are added to the working directory so the - // user can reliably locate them in their application. This is similar in principle to how - // YARN handles its `spark.files` setting. - private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = { - val pathsWithDuplicateNames = uploadedFilesCommaSeparated - .split(",") - .groupBy(new File(_).getName) - .filter(_._2.length > 1) - if (pathsWithDuplicateNames.nonEmpty) { - val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames - .values - .flatten - .toList - .sortBy(new File(_).getName) - throw new SparkException("Cannot upload files with duplicate names via" + - s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" + - s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}") - } - } - - private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = { - maybeFilePaths - .map(_.split(",")) - .map(CompressionUtils.createTarGzip(_)) - } - private def buildDriverSubmissionClient( kubernetesClient: KubernetesClient, service: Service, @@ -806,22 +787,6 @@ private[spark] class Client( }).toMap }).getOrElse(Map.empty[String, String]) } - - private def checkForFilesExistence(maybePaths: Option[String]): Unit = { - maybePaths.foreach { paths => - paths.split(",").foreach { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { - case "file" | null => - val file = new File(uri.getPath) - if (!file.isFile) { - throw new SparkException(s"""file "${uri}" does not exist!""") - } - case _ => - } - } - } - } } private[spark] object Client extends Logging { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index cb4cd42142ca4..ad83b0446538e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -91,30 +91,6 @@ package object config { .stringConf .createWithDefault("default") - private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS = - ConfigBuilder("spark.kubernetes.driver.uploads.jars") - .doc(""" - | Comma-separated list of jars to send to the driver and - | all executors when submitting the application in cluster - | mode. - """.stripMargin) - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES = - ConfigBuilder("spark.kubernetes.driver.uploads.files") - .doc(""" - | Comma-separated list of files to send to the driver and - | all executors when submitting the application in cluster - | mode. The files are added in a flat hierarchy to the - | current working directory of the driver, having the same - | names as the names of the original files. Note that two - | files with the same name cannot be added, even if they - | were in different source directories on the client disk. - """.stripMargin) - .stringConf - .createOptional - // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 6aeb851a16bf4..0d2d1a1c6f5e3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -26,8 +26,8 @@ case class KubernetesCreateSubmissionRequest( appArgs: Array[String], sparkProperties: Map[String, String], secret: String, - uploadedJarsBase64Contents: Option[TarGzippedData], - uploadedFilesBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest { + uploadedJarsBase64Contents: TarGzippedData, + uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest { message = "create" clientSparkVersion = SPARK_VERSION } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala new file mode 100644 index 0000000000000..5618c053d55bf --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.kubernetes + +import org.apache.spark.util.Utils + +private[spark] object KubernetesFileUtils { + + private def filterUriStringsByScheme( + uris: Iterable[String], schemeFilter: (String => Boolean)): Iterable[String] = { + uris.filter(uri => schemeFilter(Option(Utils.resolveURI(uri).getScheme).getOrElse("file"))) + } + + def getOnlyContainerLocalOrRemoteFiles(uris: Iterable[String]): Iterable[String] = { + filterUriStringsByScheme(uris, _ != "file") + } + + def getOnlyContainerLocalFiles(uris: Iterable[String]): Iterable[String] = { + filterUriStringsByScheme(uris, _ == "local") + } + + def getOnlySubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = { + filterUriStringsByScheme(uris, _ == "file") + } + + def isUriLocalFile(uri: String): Boolean = { + Option(Utils.resolveURI(uri).getScheme).getOrElse("file") == "file" + } + +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index c5a7e27b15927..1b531161ae8cd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -145,46 +145,74 @@ private[spark] class KubernetesSparkRestServer( } else { requestMessage match { case KubernetesCreateSubmissionRequest( - appResource, - mainClass, - appArgs, - sparkProperties, - secret, - uploadedJars, - uploadedFiles) => + appResource, + mainClass, + appArgs, + sparkProperties, + secret, + uploadedJars, + uploadedFiles) => val decodedSecret = Base64.decodeBase64(secret) if (!expectedApplicationSecret.sameElements(decodedSecret)) { responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED) handleError("Unauthorized to submit application.") } else { val tempDir = Utils.createTempDir() - val appResourcePath = resolvedAppResource(appResource, tempDir) + val (appResourceLocalPath, appResourceAddedSparkJar) = + resolvedAppResource(appResource, tempDir) val writtenJars = writeUploadedJars(uploadedJars, tempDir) val writtenFiles = writeUploadedFiles(uploadedFiles) val resolvedSparkProperties = new mutable.HashMap[String, String] resolvedSparkProperties ++= sparkProperties - - // Resolve driver classpath and jars val originalJars = sparkProperties.get("spark.jars") .map(_.split(",")) - .getOrElse(Array.empty[String]) - val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath) - val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) + .getOrElse(Array.empty) + + // The driver at this point has handed us the value of spark.jars verbatim as + // specified in spark-submit. At this point, remove all jars that were local + // to the submitting user's disk, and replace them with the paths that were + // written to disk above. + val onlyContainerLocalOrRemoteJars = KubernetesFileUtils + .getOnlyContainerLocalOrRemoteFiles(originalJars) + val resolvedJars = (writtenJars ++ + onlyContainerLocalOrRemoteJars ++ + Array(appResourceAddedSparkJar)).toSet + if (resolvedJars.nonEmpty) { + resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") + } else { + resolvedSparkProperties.remove("spark.jars") + } + + // Determining the driver classpath is similar. It's the combination of: + // - Jars written from uploads + // - Jars in (spark.jars + mainAppResource) that has a "local" prefix + // - spark.driver.extraClasspath + // - Spark core jars from the installation + val sparkCoreJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) val driverExtraClasspath = sparkProperties .get("spark.driver.extraClassPath") .map(_.split(",")) .getOrElse(Array.empty[String]) + val onlyContainerLocalJars = KubernetesFileUtils + .getOnlyContainerLocalFiles(originalJars) val driverClasspath = driverExtraClasspath ++ - resolvedJars ++ - sparkJars - resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") + Seq(appResourceLocalPath) ++ + writtenJars ++ + onlyContainerLocalJars ++ + sparkCoreJars - // Resolve spark.files + // Resolve spark.files similarly to spark.jars. val originalFiles = sparkProperties.get("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) - val resolvedFiles = originalFiles ++ writtenFiles - resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",") + val onlyContainerLocalOrRemoteFiles = KubernetesFileUtils + .getOnlyContainerLocalOrRemoteFiles(originalFiles) + val resolvedFiles = writtenFiles ++ onlyContainerLocalOrRemoteFiles + if (resolvedFiles.nonEmpty) { + resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",") + } else { + resolvedSparkProperties.remove("spark.files") + } val command = new ArrayBuffer[String] command += javaExecutable @@ -235,35 +263,39 @@ private[spark] class KubernetesSparkRestServer( } } - private def writeUploadedJars(files: Option[TarGzippedData], rootTempDir: File): + private def writeUploadedJars(jars: TarGzippedData, rootTempDir: File): Seq[String] = { val resolvedDirectory = new File(rootTempDir, "jars") if (!resolvedDirectory.mkdir()) { throw new IllegalStateException(s"Failed to create jars dir at " + resolvedDirectory.getAbsolutePath) } - writeBase64ContentsToFiles(files, resolvedDirectory) + CompressionUtils.unpackAndWriteCompressedFiles(jars, resolvedDirectory) } - private def writeUploadedFiles(files: Option[TarGzippedData]): Seq[String] = { + private def writeUploadedFiles(files: TarGzippedData): Seq[String] = { val workingDir = Paths.get("").toFile.getAbsoluteFile - writeBase64ContentsToFiles(files, workingDir) + CompressionUtils.unpackAndWriteCompressedFiles(files, workingDir) } - def resolvedAppResource(appResource: AppResource, tempDir: File): String = { - val appResourcePath = appResource match { + // Retrieve the path on the driver container where the main app resource is, and what value it + // ought to have in the spark.jars property. The two may be different because for non-local + // dependencies, we have to fetch the resource (if it is not "local") but still want to use + // the full URI in spark.jars. + private def resolvedAppResource(appResource: AppResource, tempDir: File): (String, String) = { + appResource match { case UploadedAppResource(resourceContentsBase64, resourceName) => val resourceFile = new File(tempDir, resourceName) val resourceFilePath = resourceFile.getAbsolutePath if (resourceFile.createNewFile()) { val resourceContentsBytes = Base64.decodeBase64(resourceContentsBase64) Files.write(resourceContentsBytes, resourceFile) - resourceFile.getAbsolutePath + (resourceFile.getAbsolutePath, resourceFile.getAbsolutePath) } else { throw new IllegalStateException(s"Failed to write main app resource file" + s" to $resourceFilePath") } - case ContainerAppResource(resource) => resource + case ContainerAppResource(resource) => (Utils.resolveURI(resource).getPath, resource) case RemoteAppResource(resource) => Utils.fetchFile(resource, tempDir, conf, securityManager, SparkHadoopUtil.get.newConfiguration(conf), @@ -275,19 +307,10 @@ private[spark] class KubernetesSparkRestServer( throw new IllegalStateException(s"Main app resource is not a file or" + s" does not exist at $downloadedFilePath") } - downloadedFilePath + (downloadedFilePath, resource) } - appResourcePath } } - - private def writeBase64ContentsToFiles( - maybeCompressedFiles: Option[TarGzippedData], - rootDir: File): Seq[String] = { - maybeCompressedFiles.map { compressedFiles => - CompressionUtils.unpackAndWriteCompressedFiles(compressedFiles, rootDir) - }.getOrElse(Seq.empty[String]) - } } private[spark] object KubernetesSparkRestServer { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index c5458eccf830d..ec04d9d557270 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -174,7 +174,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { .set("spark.kubernetes.namespace", NAMESPACE) .set("spark.kubernetes.driver.docker.image", "spark-driver:latest") .set("spark.kubernetes.executor.docker.image", "spark-executor:latest") - .set("spark.kubernetes.driver.uploads.jars", HELPER_JAR) + .set("spark.jars", HELPER_JAR) .set("spark.executor.memory", "500m") .set("spark.executor.cores", "1") .set("spark.executors.instances", "1") @@ -202,7 +202,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-memory", "512m", "--executor-cores", "1", "--num-executors", "1", - "--upload-jars", HELPER_JAR, + "--jars", HELPER_JAR, "--class", SPARK_PI_MAIN_CLASS, "--conf", "spark.ui.enabled=true", "--conf", "spark.testing=false", @@ -234,7 +234,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", "--conf", "spark.kubernetes.submit.waitAppCompletion=false", - s"container:///opt/spark/examples/jars/$EXAMPLES_JAR_FILE_NAME") + s"local:///opt/spark/examples/jars/$EXAMPLES_JAR_FILE_NAME") val allContainersSucceeded = SettableFuture.create[Boolean] val watcher = new Watcher[Pod] { override def eventReceived(action: Action, pod: Pod): Unit = { @@ -287,7 +287,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-memory", "512m", "--executor-cores", "1", "--num-executors", "1", - "--upload-jars", HELPER_JAR, + "--jars", HELPER_JAR, "--class", SPARK_PI_MAIN_CLASS, "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", @@ -326,7 +326,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-memory", "512m", "--executor-cores", "1", "--num-executors", "1", - "--upload-jars", HELPER_JAR, + "--jars", HELPER_JAR, "--class", SPARK_PI_MAIN_CLASS, "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", @@ -355,8 +355,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--executor-memory", "512m", "--executor-cores", "1", "--num-executors", "1", - "--upload-jars", HELPER_JAR, - "--upload-files", TEST_EXISTENCE_FILE.getAbsolutePath, + "--jars", HELPER_JAR, + "--files", TEST_EXISTENCE_FILE.getAbsolutePath, "--class", FILE_EXISTENCE_MAIN_CLASS, "--conf", "spark.ui.enabled=false", "--conf", "spark.testing=true", From 10f84e4ed239a3434c3540734e9baf287d4ed06c Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 16 Feb 2017 13:51:37 -0800 Subject: [PATCH 2/3] Address comments --- docs/running-on-kubernetes.md | 4 +- .../spark/deploy/kubernetes/Client.scala | 6 +- .../rest/kubernetes/KubernetesFileUtils.scala | 2 +- .../KubernetesSparkRestServer.scala | 48 +++++++---- .../kubernetes/docker-minimal-bundle/pom.xml | 6 -- .../src/main/assembly/driver-assembly.xml | 11 --- .../src/main/assembly/executor-assembly.xml | 11 --- .../kubernetes/integration-tests/pom.xml | 50 +++++++++++ .../integrationtest/KubernetesSuite.scala | 82 ++++--------------- 9 files changed, 101 insertions(+), 119 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index faf7e50fb6d49..e5c7e9bb69448 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -74,9 +74,9 @@ pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`. One note about the keyStore is that it can be specified as either a file on the client machine or a file in the container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:` -or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto +or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme -`container:`, the file is assumed to already be on the container's disk at the appropriate path. +`local:`, the file is assumed to already be on the container's disk at the appropriate path. ### Kubernetes Clusters and the authenticated proxy endpoint diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 0ba2aafcd057b..108a8c9510cd4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -81,7 +81,7 @@ private[spark] class Client( val submitterLocalJars = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars) (submitterLocalFiles ++ submitterLocalJars).foreach { file => if (!new File(Utils.resolveURI(file).getPath).isFile) { - throw new SparkException(s"File $file is not a file or is a directory.") + throw new SparkException(s"File $file does not exist or is a directory.") } } if (KubernetesFileUtils.isUriLocalFile(mainAppResource) && @@ -684,10 +684,6 @@ private[spark] class Client( .getOrElse("file") match { case "file" => val appFile = new File(mainResourceUri.getPath) - if (!appFile.isFile) { - throw new IllegalStateException("Provided local file path does not exist" + - s" or is not a file: ${appFile.getAbsolutePath}") - } val fileBytes = Files.toByteArray(appFile) val fileBase64 = Base64.encodeBase64String(fileBytes) UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala index 5618c053d55bf..f30be1535f81c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala @@ -25,7 +25,7 @@ private[spark] object KubernetesFileUtils { uris.filter(uri => schemeFilter(Option(Utils.resolveURI(uri).getScheme).getOrElse("file"))) } - def getOnlyContainerLocalOrRemoteFiles(uris: Iterable[String]): Iterable[String] = { + def getNonSubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = { filterUriStringsByScheme(uris, _ != "file") } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index 1b531161ae8cd..f0b01b2320982 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -16,14 +16,14 @@ */ package org.apache.spark.deploy.rest.kubernetes -import java.io.File +import java.io.{File, FileOutputStream, StringReader} import java.net.URI import java.nio.file.Paths import java.util.concurrent.CountDownLatch import javax.servlet.http.{HttpServletRequest, HttpServletResponse} import com.google.common.base.Charsets -import com.google.common.io.Files +import com.google.common.io.{BaseEncoding, ByteStreams, Files} import org.apache.commons.codec.binary.Base64 import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -158,8 +158,7 @@ private[spark] class KubernetesSparkRestServer( handleError("Unauthorized to submit application.") } else { val tempDir = Utils.createTempDir() - val (appResourceLocalPath, appResourceAddedSparkJar) = - resolvedAppResource(appResource, tempDir) + val resolvedAppResource = resolveAppResource(appResource, tempDir) val writtenJars = writeUploadedJars(uploadedJars, tempDir) val writtenFiles = writeUploadedFiles(uploadedFiles) val resolvedSparkProperties = new mutable.HashMap[String, String] @@ -173,10 +172,10 @@ private[spark] class KubernetesSparkRestServer( // to the submitting user's disk, and replace them with the paths that were // written to disk above. val onlyContainerLocalOrRemoteJars = KubernetesFileUtils - .getOnlyContainerLocalOrRemoteFiles(originalJars) + .getNonSubmitterLocalFiles(originalJars) val resolvedJars = (writtenJars ++ onlyContainerLocalOrRemoteJars ++ - Array(appResourceAddedSparkJar)).toSet + Array(resolvedAppResource.sparkJarPath)).toSet if (resolvedJars.nonEmpty) { resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") } else { @@ -196,7 +195,7 @@ private[spark] class KubernetesSparkRestServer( val onlyContainerLocalJars = KubernetesFileUtils .getOnlyContainerLocalFiles(originalJars) val driverClasspath = driverExtraClasspath ++ - Seq(appResourceLocalPath) ++ + Seq(resolvedAppResource.localPath) ++ writtenJars ++ onlyContainerLocalJars ++ sparkCoreJars @@ -206,7 +205,7 @@ private[spark] class KubernetesSparkRestServer( .map(_.split(",")) .getOrElse(Array.empty[String]) val onlyContainerLocalOrRemoteFiles = KubernetesFileUtils - .getOnlyContainerLocalOrRemoteFiles(originalFiles) + .getNonSubmitterLocalFiles(originalFiles) val resolvedFiles = writtenFiles ++ onlyContainerLocalOrRemoteFiles if (resolvedFiles.nonEmpty) { resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",") @@ -278,24 +277,35 @@ private[spark] class KubernetesSparkRestServer( CompressionUtils.unpackAndWriteCompressedFiles(files, workingDir) } - // Retrieve the path on the driver container where the main app resource is, and what value it - // ought to have in the spark.jars property. The two may be different because for non-local - // dependencies, we have to fetch the resource (if it is not "local") but still want to use - // the full URI in spark.jars. - private def resolvedAppResource(appResource: AppResource, tempDir: File): (String, String) = { + + /** + * Retrieve the path on the driver container where the main app resource is, and what value it + * ought to have in the spark.jars property. The two may be different because for non-local + * dependencies, we have to fetch the resource (if it is not "local") but still want to use + * the full URI in spark.jars. + */ + private def resolveAppResource(appResource: AppResource, tempDir: File): + ResolvedAppResource = { appResource match { case UploadedAppResource(resourceContentsBase64, resourceName) => val resourceFile = new File(tempDir, resourceName) val resourceFilePath = resourceFile.getAbsolutePath if (resourceFile.createNewFile()) { - val resourceContentsBytes = Base64.decodeBase64(resourceContentsBase64) - Files.write(resourceContentsBytes, resourceFile) - (resourceFile.getAbsolutePath, resourceFile.getAbsolutePath) + Utils.tryWithResource(new StringReader(resourceContentsBase64)) { reader => + Utils.tryWithResource(new FileOutputStream(resourceFile)) { os => + Utils.tryWithResource(BaseEncoding.base64().decodingStream(reader)) { + decodingStream => + ByteStreams.copy(decodingStream, os) + } + } + } + ResolvedAppResource(resourceFile.getAbsolutePath, resourceFile.getAbsolutePath) } else { throw new IllegalStateException(s"Failed to write main app resource file" + s" to $resourceFilePath") } - case ContainerAppResource(resource) => (Utils.resolveURI(resource).getPath, resource) + case ContainerAppResource(resource) => + ResolvedAppResource(Utils.resolveURI(resource).getPath, resource) case RemoteAppResource(resource) => Utils.fetchFile(resource, tempDir, conf, securityManager, SparkHadoopUtil.get.newConfiguration(conf), @@ -307,10 +317,12 @@ private[spark] class KubernetesSparkRestServer( throw new IllegalStateException(s"Main app resource is not a file or" + s" does not exist at $downloadedFilePath") } - (downloadedFilePath, resource) + ResolvedAppResource(downloadedFilePath, resource) } } } + + private case class ResolvedAppResource(localPath: String, sparkJarPath: String) } private[spark] object KubernetesSparkRestServer { diff --git a/resource-managers/kubernetes/docker-minimal-bundle/pom.xml b/resource-managers/kubernetes/docker-minimal-bundle/pom.xml index 0ec2f36075db3..7f4d935e0e243 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/pom.xml +++ b/resource-managers/kubernetes/docker-minimal-bundle/pom.xml @@ -44,12 +44,6 @@ pom - - org.apache.spark - spark-examples_${scala.binary.version} - ${project.version} - provided -