diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 5874511820a34..c62b97333cba5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -462,10 +462,6 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.kubernetesNamespace, KUBERNETES, ALL_DEPLOY_MODES,
sysProp = "spark.kubernetes.namespace"),
- OptionAssigner(args.kubernetesUploadJars, KUBERNETES, CLUSTER,
- sysProp = "spark.kubernetes.driver.uploads.jars"),
- OptionAssigner(args.kubernetesUploadFiles, KUBERNETES, CLUSTER,
- sysProp = "spark.kubernetes.driver.uploads.files"),
// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN, ALL_DEPLOY_MODES,
@@ -474,10 +470,11 @@ object SparkSubmit extends CommandLineUtils {
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
- OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
+ OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
sysProp = "spark.files"),
OptionAssigner(args.jars, LOCAL, CLIENT, sysProp = "spark.jars"),
- OptionAssigner(args.jars, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
+ OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
+ sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN, CLUSTER,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN, CLUSTER,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9eb1a16df809b..90903325d7759 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -73,8 +73,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
// Kubernetes only
var kubernetesNamespace: String = null
- var kubernetesUploadJars: String = null
- var kubernetesUploadFiles: String = null
// Standalone cluster mode only
var supervise: Boolean = false
@@ -194,12 +192,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
kubernetesNamespace = Option(kubernetesNamespace)
.orElse(sparkProperties.get("spark.kubernetes.namespace"))
.orNull
- kubernetesUploadJars = Option(kubernetesUploadJars)
- .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.jars"))
- .orNull
- kubernetesUploadFiles = Option(kubernetesUploadFiles)
- .orElse(sparkProperties.get("spark.kubernetes.driver.uploads.files"))
- .orNull
// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
@@ -441,12 +433,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case KUBERNETES_NAMESPACE =>
kubernetesNamespace = value
- case KUBERNETES_UPLOAD_JARS =>
- kubernetesUploadJars = value
-
- case KUBERNETES_UPLOAD_FILES =>
- kubernetesUploadFiles = value
-
case HELP =>
printUsageAndExit(0)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 19f406039e261..e5c7e9bb69448 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -51,87 +51,15 @@ connect without SSL on a different port, the master would be set to `k8s://http:
Note that applications can currently only be executed in cluster mode, where the driver and its executors are running on
the cluster.
-
-### Adding Other JARs
-
-Spark allows users to provide dependencies that are bundled into the driver's Docker image, or that are on the local
-disk of the submitter's machine. These two types of dependencies are specified via different configuration options to
-`spark-submit`:
-* Local jars provided by specifying the `--jars` command line argument to `spark-submit`, or by setting `spark.jars` in
- the application's configuration, will be treated as jars that are located on the *disk of the driver container*. This
- only applies to jar paths that do not specify a scheme or that have the scheme `file://`. Paths with other schemes are
- fetched from their appropriate locations.
-* Local jars provided by specifying the `--upload-jars` command line argument to `spark-submit`, or by setting
- `spark.kubernetes.driver.uploads.jars` in the application's configuration, will be treated as jars that are located on
- the *disk of the submitting machine*. These jars are uploaded to the driver docker container before executing the
- application.
-* A main application resource path that does not have a scheme or that has the scheme `file://` is assumed to be on the
- *disk of the submitting machine*. This resource is uploaded to the driver docker container before executing the
- application. A remote path can still be specified and the resource will be fetched from the appropriate location.
-* A main application resource path that has the scheme `container://` is assumed to be on the *disk of the driver
- container*.
-
-In all of these cases, the jars are placed on the driver's classpath, and are also sent to the executors. Below are some
-examples of providing application dependencies.
-
-To submit an application with both the main resource and two other jars living on the submitting user's machine:
-
- bin/spark-submit \
- --deploy-mode cluster \
- --class com.example.applications.SampleApplication \
- --master k8s://192.168.99.100 \
- --upload-jars /home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \
- --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \
- --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
- /home/exampleuser/exampleapplication/main.jar
-
-Note that since passing the jars through the `--upload-jars` command line argument is equivalent to setting the
-`spark.kubernetes.driver.uploads.jars` Spark property, the above will behave identically to this command:
-
- bin/spark-submit \
- --deploy-mode cluster \
- --class com.example.applications.SampleApplication \
- --master k8s://192.168.99.100 \
- --conf spark.kubernetes.driver.uploads.jars=/home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \
- --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \
- --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
- /home/exampleuser/exampleapplication/main.jar
-
-To specify a main application resource that can be downloaded from an HTTP service, and if a plugin for that application
-is located in the jar `/opt/spark-plugins/app-plugin.jar` on the docker image's disk:
-
- bin/spark-submit \
- --deploy-mode cluster \
- --class com.example.applications.PluggableApplication \
- --master k8s://192.168.99.100 \
- --jars /opt/spark-plugins/app-plugin.jar \
- --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \
- --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
- http://example.com:8080/applications/sparkpluggable/app.jar
-
-Note that since passing the jars through the `--jars` command line argument is equivalent to setting the `spark.jars`
-Spark property, the above will behave identically to this command:
-
- bin/spark-submit \
- --deploy-mode cluster \
- --class com.example.applications.PluggableApplication \
- --master k8s://192.168.99.100 \
- --conf spark.jars=file:///opt/spark-plugins/app-plugin.jar \
- --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \
- --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
- http://example.com:8080/applications/sparkpluggable/app.jar
-
-To specify a main application resource that is in the Docker image, and if it has no other dependencies:
-
- bin/spark-submit \
- --deploy-mode cluster \
- --class com.example.applications.PluggableApplication \
- --master k8s://192.168.99.100:8443 \
- --conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \
- --conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
- container:///home/applications/examples/example.jar
+### Dependency Management and Docker Containers
+Spark supports specifying JAR paths that are either on the submitting host's disk, or are located on the disk of the
+driver and executors. Refer to the [application submission](submitting-applications.html#advanced-dependency-management)
+section for details. Note that files specified with the `local` scheme should be added to the container image of both
+the driver and the executors. Files without a scheme or with the scheme `file://` are treated as being on the disk of
+the submitting machine, and are uploaded to the driver running in Kubernetes before launching the application.
+
### Setting Up SSL For Submitting the Driver
When submitting to Kubernetes, a pod is started for the driver, and the pod starts an HTTP server. This HTTP server
@@ -146,9 +74,9 @@ pod in starting the application, set `spark.ssl.kubernetes.submit.trustStore`.
One note about the keyStore is that it can be specified as either a file on the client machine or a file in the
container image's disk. Thus `spark.ssl.kubernetes.submit.keyStore` can be a URI with a scheme of either `file:`
-or `container:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
+or `local:`. A scheme of `file:` corresponds to the keyStore being located on the client machine; it is mounted onto
the driver container as a [secret volume](https://kubernetes.io/docs/user-guide/secrets/). When the URI has the scheme
-`container:`, the file is assumed to already be on the container's disk at the appropriate path.
+`local:`, the file is assumed to already be on the container's disk at the appropriate path.
### Kubernetes Clusters and the authenticated proxy endpoint
@@ -241,24 +169,6 @@ from the other deployment modes. See the [configuration page](configuration.html
executor pods from the API server.
-
- spark.kubernetes.driver.uploads.jars |
- (none) |
-
- Comma-separated list of jars to send to the driver and all executors when submitting the application in cluster
- mode. Refer to adding other jars for more information.
- |
-
-
- spark.kubernetes.driver.uploads.files |
- (none) |
-
- Comma-separated list of files to send to the driver and all executors when submitting the application in cluster
- mode. The files are added in a flat hierarchy to the current working directory of the driver, having the same
- names as the names of the original files. Note that two files with the same name cannot be added, even if they
- were in different source directories on the client disk.
- |
-
spark.kubernetes.executor.memoryOverhead |
executorMemory * 0.10, with minimum of 384 |
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
index 3369b5d8301be..a4d43c0795abc 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -77,10 +77,7 @@ class SparkSubmitOptionParser {
protected final String QUEUE = "--queue";
// Kubernetes-only options.
- protected final String KUBERNETES_MASTER = "--kubernetes-master";
protected final String KUBERNETES_NAMESPACE = "--kubernetes-namespace";
- protected final String KUBERNETES_UPLOAD_JARS = "--upload-jars";
- protected final String KUBERNETES_UPLOAD_FILES = "--upload-files";
/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
@@ -121,10 +118,7 @@ class SparkSubmitOptionParser {
{ REPOSITORIES },
{ STATUS },
{ TOTAL_EXECUTOR_CORES },
- { KUBERNETES_MASTER },
- { KUBERNETES_NAMESPACE },
- { KUBERNETES_UPLOAD_JARS },
- { KUBERNETES_UPLOAD_FILES }
+ { KUBERNETES_NAMESPACE }
};
/**
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
index 279ee505de609..aa273a024f6f9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala
@@ -35,7 +35,7 @@ import scala.collection.mutable
import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
-import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
+import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
@@ -59,11 +59,10 @@ private[spark] class Client(
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
- private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty)
- private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty)
- uploadedFiles.foreach(validateNoDuplicateUploadFileNames)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
+ private val sparkFiles = sparkConf.getOption("spark.files")
+ private val sparkJars = sparkConf.getOption("spark.jars")
private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)
@@ -78,9 +77,18 @@ private[spark] class Client(
def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
-
- Seq(uploadedFiles, uploadedJars, Some(mainAppResource)).foreach(checkForFilesExistence)
-
+ val submitterLocalFiles = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkFiles)
+ val submitterLocalJars = KubernetesFileUtils.getOnlySubmitterLocalFiles(sparkJars)
+ (submitterLocalFiles ++ submitterLocalJars).foreach { file =>
+ if (!new File(Utils.resolveURI(file).getPath).isFile) {
+ throw new SparkException(s"File $file does not exist or is a directory.")
+ }
+ }
+ if (KubernetesFileUtils.isUriLocalFile(mainAppResource) &&
+ !new File(Utils.resolveURI(mainAppResource).getPath).isFile) {
+ throw new SparkException(s"Main app resource file $mainAppResource is not a file or" +
+ s" is a directory.")
+ }
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new K8SConfigBuilder()
@@ -145,7 +153,7 @@ private[spark] class Client(
}
try {
submitApplicationToDriverServer(kubernetesClient, driverSubmitSslOptions,
- ownerReferenceConfiguredDriverService)
+ ownerReferenceConfiguredDriverService, submitterLocalFiles, submitterLocalJars)
// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
@@ -193,7 +201,9 @@ private[spark] class Client(
private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverSubmitSslOptions: SSLOptions,
- driverService: Service) = {
+ driverService: Service,
+ submitterLocalFiles: Iterable[String],
+ submitterLocalJars: Iterable[String]): Unit = {
sparkConf.getOption("spark.app.id").foreach { id =>
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
s" overridden as $kubernetesAppId")
@@ -211,7 +221,7 @@ private[spark] class Client(
driverSubmitter.ping()
logInfo(s"Submitting local resources to driver pod for application " +
s"$kubernetesAppId ...")
- val submitRequest = buildSubmissionRequest()
+ val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars)
driverSubmitter.submitApplication(submitRequest)
logInfo("Successfully submitted local resources and driver configuration to" +
" driver pod.")
@@ -502,25 +512,18 @@ private[spark] class Client(
val maybeKeyStore = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_KEYSTORE)
val resolvedSparkConf = sparkConf.clone()
val (isLocalKeyStore, resolvedKeyStore) = maybeKeyStore.map(keyStore => {
- val keyStoreURI = Utils.resolveURI(keyStore)
- val isProvidedKeyStoreLocal = keyStoreURI.getScheme match {
- case "file" | null => true
- case "container" => false
- case _ => throw new SparkException(s"Invalid KeyStore URI $keyStore; keyStore URI" +
- " for submit server must have scheme file:// or container:// (no scheme defaults" +
- " to file://)")
- }
- (isProvidedKeyStoreLocal, Option.apply(keyStoreURI.getPath))
- }).getOrElse((true, Option.empty[String]))
+ (KubernetesFileUtils.isUriLocalFile(keyStore),
+ Option.apply(Utils.resolveURI(keyStore).getPath))
+ }).getOrElse((false, Option.empty[String]))
resolvedKeyStore.foreach {
resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_KEYSTORE, _)
}
sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE).foreach { trustStore =>
- val trustStoreURI = Utils.resolveURI(trustStore)
- trustStoreURI.getScheme match {
- case "file" | null =>
- resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE, trustStoreURI.getPath)
- case _ => throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
+ if (KubernetesFileUtils.isUriLocalFile(trustStore)) {
+ resolvedSparkConf.set(KUBERNETES_DRIVER_SUBMIT_TRUSTSTORE,
+ Utils.resolveURI(trustStore).getPath)
+ } else {
+ throw new SparkException(s"Invalid trustStore URI $trustStore; trustStore URI" +
" for submit server must have no scheme, or scheme file://")
}
}
@@ -673,23 +676,24 @@ private[spark] class Client(
.build())
}
- private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = {
- val appResourceUri = Utils.resolveURI(mainAppResource)
- val resolvedAppResource: AppResource = appResourceUri.getScheme match {
- case "file" | null =>
- val appFile = new File(appResourceUri.getPath)
- if (!appFile.isFile) {
- throw new IllegalStateException("Provided local file path does not exist" +
- s" or is not a file: ${appFile.getAbsolutePath}")
- }
+ private def buildSubmissionRequest(
+ submitterLocalFiles: Iterable[String],
+ submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = {
+ val mainResourceUri = Utils.resolveURI(mainAppResource)
+ val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme)
+ .getOrElse("file") match {
+ case "file" =>
+ val appFile = new File(mainResourceUri.getPath)
val fileBytes = Files.toByteArray(appFile)
val fileBase64 = Base64.encodeBase64String(fileBytes)
UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName)
- case "container" => ContainerAppResource(appResourceUri.getPath)
+ case "local" => ContainerAppResource(mainAppResource)
case other => RemoteAppResource(other)
}
- val uploadJarsBase64Contents = compressFiles(uploadedJars)
- val uploadFilesBase64Contents = compressFiles(uploadedFiles)
+ val uploadFilesBase64Contents = CompressionUtils.createTarGzip(submitterLocalFiles.map(
+ Utils.resolveURI(_).getPath))
+ val uploadJarsBase64Contents = CompressionUtils.createTarGzip(submitterLocalJars.map(
+ Utils.resolveURI(_).getPath))
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
@@ -700,33 +704,6 @@ private[spark] class Client(
uploadedFilesBase64Contents = uploadFilesBase64Contents)
}
- // Because uploaded files should be added to the working directory of the driver, they
- // need to not have duplicate file names. They are added to the working directory so the
- // user can reliably locate them in their application. This is similar in principle to how
- // YARN handles its `spark.files` setting.
- private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = {
- val pathsWithDuplicateNames = uploadedFilesCommaSeparated
- .split(",")
- .groupBy(new File(_).getName)
- .filter(_._2.length > 1)
- if (pathsWithDuplicateNames.nonEmpty) {
- val pathsWithDuplicateNamesSorted = pathsWithDuplicateNames
- .values
- .flatten
- .toList
- .sortBy(new File(_).getName)
- throw new SparkException("Cannot upload files with duplicate names via" +
- s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" +
- s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}")
- }
- }
-
- private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
- maybeFilePaths
- .map(_.split(","))
- .map(CompressionUtils.createTarGzip(_))
- }
-
private def buildDriverSubmissionClient(
kubernetesClient: KubernetesClient,
service: Service,
@@ -813,22 +790,6 @@ private[spark] class Client(
}).toMap
}).getOrElse(Map.empty[String, String])
}
-
- private def checkForFilesExistence(maybePaths: Option[String]): Unit = {
- maybePaths.foreach { paths =>
- paths.split(",").foreach { path =>
- val uri = Utils.resolveURI(path)
- uri.getScheme match {
- case "file" | null =>
- val file = new File(uri.getPath)
- if (!file.isFile) {
- throw new SparkException(s"""file "${uri}" does not exist!""")
- }
- case _ =>
- }
- }
- }
- }
}
private[spark] object Client extends Logging {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
index cb4cd42142ca4..ad83b0446538e 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala
@@ -91,30 +91,6 @@ package object config {
.stringConf
.createWithDefault("default")
- private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
- ConfigBuilder("spark.kubernetes.driver.uploads.jars")
- .doc("""
- | Comma-separated list of jars to send to the driver and
- | all executors when submitting the application in cluster
- | mode.
- """.stripMargin)
- .stringConf
- .createOptional
-
- private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES =
- ConfigBuilder("spark.kubernetes.driver.uploads.files")
- .doc("""
- | Comma-separated list of files to send to the driver and
- | all executors when submitting the application in cluster
- | mode. The files are added in a flat hierarchy to the
- | current working directory of the driver, having the same
- | names as the names of the original files. Note that two
- | files with the same name cannot be added, even if they
- | were in different source directories on the client disk.
- """.stripMargin)
- .stringConf
- .createOptional
-
// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
index 6aeb851a16bf4..0d2d1a1c6f5e3 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala
@@ -26,8 +26,8 @@ case class KubernetesCreateSubmissionRequest(
appArgs: Array[String],
sparkProperties: Map[String, String],
secret: String,
- uploadedJarsBase64Contents: Option[TarGzippedData],
- uploadedFilesBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
+ uploadedJarsBase64Contents: TarGzippedData,
+ uploadedFilesBase64Contents: TarGzippedData) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala
new file mode 100644
index 0000000000000..f30be1535f81c
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesFileUtils.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.rest.kubernetes
+
+import org.apache.spark.util.Utils
+
+private[spark] object KubernetesFileUtils {
+
+ private def filterUriStringsByScheme(
+ uris: Iterable[String], schemeFilter: (String => Boolean)): Iterable[String] = {
+ uris.filter(uri => schemeFilter(Option(Utils.resolveURI(uri).getScheme).getOrElse("file")))
+ }
+
+ def getNonSubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = {
+ filterUriStringsByScheme(uris, _ != "file")
+ }
+
+ def getOnlyContainerLocalFiles(uris: Iterable[String]): Iterable[String] = {
+ filterUriStringsByScheme(uris, _ == "local")
+ }
+
+ def getOnlySubmitterLocalFiles(uris: Iterable[String]): Iterable[String] = {
+ filterUriStringsByScheme(uris, _ == "file")
+ }
+
+ def isUriLocalFile(uri: String): Boolean = {
+ Option(Utils.resolveURI(uri).getScheme).getOrElse("file") == "file"
+ }
+
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
index c5a7e27b15927..f0b01b2320982 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala
@@ -16,14 +16,14 @@
*/
package org.apache.spark.deploy.rest.kubernetes
-import java.io.File
+import java.io.{File, FileOutputStream, StringReader}
import java.net.URI
import java.nio.file.Paths
import java.util.concurrent.CountDownLatch
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import com.google.common.base.Charsets
-import com.google.common.io.Files
+import com.google.common.io.{BaseEncoding, ByteStreams, Files}
import org.apache.commons.codec.binary.Base64
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -145,46 +145,73 @@ private[spark] class KubernetesSparkRestServer(
} else {
requestMessage match {
case KubernetesCreateSubmissionRequest(
- appResource,
- mainClass,
- appArgs,
- sparkProperties,
- secret,
- uploadedJars,
- uploadedFiles) =>
+ appResource,
+ mainClass,
+ appArgs,
+ sparkProperties,
+ secret,
+ uploadedJars,
+ uploadedFiles) =>
val decodedSecret = Base64.decodeBase64(secret)
if (!expectedApplicationSecret.sameElements(decodedSecret)) {
responseServlet.setStatus(HttpServletResponse.SC_UNAUTHORIZED)
handleError("Unauthorized to submit application.")
} else {
val tempDir = Utils.createTempDir()
- val appResourcePath = resolvedAppResource(appResource, tempDir)
+ val resolvedAppResource = resolveAppResource(appResource, tempDir)
val writtenJars = writeUploadedJars(uploadedJars, tempDir)
val writtenFiles = writeUploadedFiles(uploadedFiles)
val resolvedSparkProperties = new mutable.HashMap[String, String]
resolvedSparkProperties ++= sparkProperties
-
- // Resolve driver classpath and jars
val originalJars = sparkProperties.get("spark.jars")
.map(_.split(","))
- .getOrElse(Array.empty[String])
- val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath)
- val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath)
+ .getOrElse(Array.empty)
+
+ // The driver at this point has handed us the value of spark.jars verbatim as
+ // specified in spark-submit. At this point, remove all jars that were local
+ // to the submitting user's disk, and replace them with the paths that were
+ // written to disk above.
+ val onlyContainerLocalOrRemoteJars = KubernetesFileUtils
+ .getNonSubmitterLocalFiles(originalJars)
+ val resolvedJars = (writtenJars ++
+ onlyContainerLocalOrRemoteJars ++
+ Array(resolvedAppResource.sparkJarPath)).toSet
+ if (resolvedJars.nonEmpty) {
+ resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",")
+ } else {
+ resolvedSparkProperties.remove("spark.jars")
+ }
+
+ // Determining the driver classpath is similar. It's the combination of:
+ // - Jars written from uploads
+ // - Jars in (spark.jars + mainAppResource) that has a "local" prefix
+ // - spark.driver.extraClasspath
+ // - Spark core jars from the installation
+ val sparkCoreJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath)
val driverExtraClasspath = sparkProperties
.get("spark.driver.extraClassPath")
.map(_.split(","))
.getOrElse(Array.empty[String])
+ val onlyContainerLocalJars = KubernetesFileUtils
+ .getOnlyContainerLocalFiles(originalJars)
val driverClasspath = driverExtraClasspath ++
- resolvedJars ++
- sparkJars
- resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",")
+ Seq(resolvedAppResource.localPath) ++
+ writtenJars ++
+ onlyContainerLocalJars ++
+ sparkCoreJars
- // Resolve spark.files
+ // Resolve spark.files similarly to spark.jars.
val originalFiles = sparkProperties.get("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
- val resolvedFiles = originalFiles ++ writtenFiles
- resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",")
+ val onlyContainerLocalOrRemoteFiles = KubernetesFileUtils
+ .getNonSubmitterLocalFiles(originalFiles)
+ val resolvedFiles = writtenFiles ++ onlyContainerLocalOrRemoteFiles
+ if (resolvedFiles.nonEmpty) {
+ resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",")
+ } else {
+ resolvedSparkProperties.remove("spark.files")
+ }
val command = new ArrayBuffer[String]
command += javaExecutable
@@ -235,35 +262,50 @@ private[spark] class KubernetesSparkRestServer(
}
}
- private def writeUploadedJars(files: Option[TarGzippedData], rootTempDir: File):
+ private def writeUploadedJars(jars: TarGzippedData, rootTempDir: File):
Seq[String] = {
val resolvedDirectory = new File(rootTempDir, "jars")
if (!resolvedDirectory.mkdir()) {
throw new IllegalStateException(s"Failed to create jars dir at " +
resolvedDirectory.getAbsolutePath)
}
- writeBase64ContentsToFiles(files, resolvedDirectory)
+ CompressionUtils.unpackAndWriteCompressedFiles(jars, resolvedDirectory)
}
- private def writeUploadedFiles(files: Option[TarGzippedData]): Seq[String] = {
+ private def writeUploadedFiles(files: TarGzippedData): Seq[String] = {
val workingDir = Paths.get("").toFile.getAbsoluteFile
- writeBase64ContentsToFiles(files, workingDir)
+ CompressionUtils.unpackAndWriteCompressedFiles(files, workingDir)
}
- def resolvedAppResource(appResource: AppResource, tempDir: File): String = {
- val appResourcePath = appResource match {
+
+ /**
+ * Retrieve the path on the driver container where the main app resource is, and what value it
+ * ought to have in the spark.jars property. The two may be different because for non-local
+ * dependencies, we have to fetch the resource (if it is not "local") but still want to use
+ * the full URI in spark.jars.
+ */
+ private def resolveAppResource(appResource: AppResource, tempDir: File):
+ ResolvedAppResource = {
+ appResource match {
case UploadedAppResource(resourceContentsBase64, resourceName) =>
val resourceFile = new File(tempDir, resourceName)
val resourceFilePath = resourceFile.getAbsolutePath
if (resourceFile.createNewFile()) {
- val resourceContentsBytes = Base64.decodeBase64(resourceContentsBase64)
- Files.write(resourceContentsBytes, resourceFile)
- resourceFile.getAbsolutePath
+ Utils.tryWithResource(new StringReader(resourceContentsBase64)) { reader =>
+ Utils.tryWithResource(new FileOutputStream(resourceFile)) { os =>
+ Utils.tryWithResource(BaseEncoding.base64().decodingStream(reader)) {
+ decodingStream =>
+ ByteStreams.copy(decodingStream, os)
+ }
+ }
+ }
+ ResolvedAppResource(resourceFile.getAbsolutePath, resourceFile.getAbsolutePath)
} else {
throw new IllegalStateException(s"Failed to write main app resource file" +
s" to $resourceFilePath")
}
- case ContainerAppResource(resource) => resource
+ case ContainerAppResource(resource) =>
+ ResolvedAppResource(Utils.resolveURI(resource).getPath, resource)
case RemoteAppResource(resource) =>
Utils.fetchFile(resource, tempDir, conf,
securityManager, SparkHadoopUtil.get.newConfiguration(conf),
@@ -275,19 +317,12 @@ private[spark] class KubernetesSparkRestServer(
throw new IllegalStateException(s"Main app resource is not a file or" +
s" does not exist at $downloadedFilePath")
}
- downloadedFilePath
+ ResolvedAppResource(downloadedFilePath, resource)
}
- appResourcePath
}
}
- private def writeBase64ContentsToFiles(
- maybeCompressedFiles: Option[TarGzippedData],
- rootDir: File): Seq[String] = {
- maybeCompressedFiles.map { compressedFiles =>
- CompressionUtils.unpackAndWriteCompressedFiles(compressedFiles, rootDir)
- }.getOrElse(Seq.empty[String])
- }
+ private case class ResolvedAppResource(localPath: String, sparkJarPath: String)
}
private[spark] object KubernetesSparkRestServer {
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/pom.xml b/resource-managers/kubernetes/docker-minimal-bundle/pom.xml
index 0ec2f36075db3..7f4d935e0e243 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/pom.xml
+++ b/resource-managers/kubernetes/docker-minimal-bundle/pom.xml
@@ -44,12 +44,6 @@
pom
-
- org.apache.spark
- spark-examples_${scala.binary.version}
- ${project.version}
- provided
-