From a06641bd17b42406e1b06dcccb6a1007d2f41fae Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 9 Feb 2017 22:52:17 -0800 Subject: [PATCH] Use spark.jars and spark.files instead of k8s-specific config This brings spark.jars and spark.files in line with the behavior of YARN and other cluster managers. Specifically now, the following schemes are supported: - local:// is a container-local file assumed to be present on both driver and executor containers - container:// is a synonym for local:// - file:// is a submitter-local file that's uploaded to the driver - a no-scheme path is treated as if it had the file:// scheme Filenames of spark.files are required to be unique since they are all placed in the current working directory of the driver and executors. spark.jars does not have this restriction -- they are given a unique suffix and placed in a separate folder from the current working directory and added to the driver classpath. --- .../spark/deploy/kubernetes/Client.scala | 57 ++++++++----------- .../spark/deploy/kubernetes/config.scala | 24 -------- .../rest/KubernetesRestProtocolMessages.scala | 23 ++++++++ .../KubernetesSparkRestServer.scala | 20 +++++-- 4 files changed, 62 insertions(+), 62 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 9eed9bfd2cd7..8fdcfaa58f1b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -22,6 +22,9 @@ import java.util import java.util.concurrent.{CountDownLatch, TimeUnit} import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager} +import scala.collection.JavaConverters._ +import scala.collection.mutable + import com.google.common.base.Charsets import com.google.common.io.Files import com.google.common.util.concurrent.SettableFuture @@ -29,13 +32,11 @@ import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action import org.apache.commons.codec.binary.Base64 -import scala.collection.JavaConverters._ -import scala.collection.mutable import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource} +import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, TarGzippedData} import org.apache.spark.deploy.rest.kubernetes._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -59,9 +60,9 @@ private[spark] class Client( private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl" private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId" private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) - private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty) - private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty) - uploadedFiles.foreach(validateNoDuplicateUploadFileNames) + private val sparkJars = sparkConf.getOption("spark.jars") + private val sparkFiles = sparkConf.getOption("spark.files") + sparkFiles.foreach(validateNoDuplicateFileNames) private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT) private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT) @@ -79,7 +80,7 @@ private[spark] class Client( def run(): Unit = { logInfo(s"Starting application $kubernetesAppId in Kubernetes...") - Seq(uploadedFiles, uploadedJars, Some(mainAppResource)).foreach(checkForFilesExistence) + Seq(sparkJars, sparkFiles, Some(mainAppResource)).foreach(checkForFilesExistence) val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions() val parsedCustomLabels = parseCustomLabels(customLabels) @@ -674,22 +675,9 @@ private[spark] class Client( } private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { - val appResourceUri = Utils.resolveURI(mainAppResource) - val resolvedAppResource: AppResource = appResourceUri.getScheme match { - case "file" | null => - val appFile = new File(appResourceUri.getPath) - if (!appFile.isFile) { - throw new IllegalStateException("Provided local file path does not exist" + - s" or is not a file: ${appFile.getAbsolutePath}") - } - val fileBytes = Files.toByteArray(appFile) - val fileBase64 = Base64.encodeBase64String(fileBytes) - UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) - case "container" => ContainerAppResource(appResourceUri.getPath) - case other => RemoteAppResource(other) - } - val uploadJarsBase64Contents = compressFiles(uploadedJars) - val uploadFilesBase64Contents = compressFiles(uploadedFiles) + val resolvedAppResource: AppResource = AppResource.assemble(mainAppResource) + val uploadJarsBase64Contents = compressUploadableFiles(sparkJars) + val uploadFilesBase64Contents = compressUploadableFiles(sparkFiles) KubernetesCreateSubmissionRequest( appResource = resolvedAppResource, mainClass = mainClass, @@ -700,11 +688,10 @@ private[spark] class Client( uploadedFilesBase64Contents = uploadFilesBase64Contents) } - // Because uploaded files should be added to the working directory of the driver, they - // need to not have duplicate file names. They are added to the working directory so the - // user can reliably locate them in their application. This is similar in principle to how - // YARN handles its `spark.files` setting. - private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = { + // Because files should be added to the working directory of the driver, they need to not have + // duplicate file names. They are added to the working directory so the user can reliably + // locate them in their application. This is similar to how YARN handles `spark.files` too. + private def validateNoDuplicateFileNames(uploadedFilesCommaSeparated: String): Unit = { val pathsWithDuplicateNames = uploadedFilesCommaSeparated .split(",") .groupBy(new File(_).getName) @@ -715,15 +702,21 @@ private[spark] class Client( .flatten .toList .sortBy(new File(_).getName) - throw new SparkException("Cannot upload files with duplicate names via" + - s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" + - s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}") + throw new SparkException("Cannot support files with duplicate names in spark.files because " + + "the files are placed into the working directory of the driver and executors. The " + + "following paths have a duplicated file name:" + + pathsWithDuplicateNamesSorted.mkString(",")) } } - private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = { + private def compressUploadableFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = { maybeFilePaths .map(_.split(",")) + .map(_.filter(Utils.resolveURI(_).getScheme match { + // only local files that need to be uploaded + case "file" | null => true + case _ => false + })) .map(CompressionUtils.createTarGzip(_)) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index cb4cd42142ca..ad83b0446538 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -91,30 +91,6 @@ package object config { .stringConf .createWithDefault("default") - private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS = - ConfigBuilder("spark.kubernetes.driver.uploads.jars") - .doc(""" - | Comma-separated list of jars to send to the driver and - | all executors when submitting the application in cluster - | mode. - """.stripMargin) - .stringConf - .createOptional - - private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES = - ConfigBuilder("spark.kubernetes.driver.uploads.files") - .doc(""" - | Comma-separated list of files to send to the driver and - | all executors when submitting the application in cluster - | mode. The files are added in a flat hierarchy to the - | current working directory of the driver, having the same - | names as the names of the original files. Note that two - | files with the same name cannot be added, even if they - | were in different source directories on the client disk. - """.stripMargin) - .stringConf - .createOptional - // Note that while we set a default for this when we start up the // scheduler, the specific default value is dynamically determined // based on the executor memory. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala index 6aeb851a16bf..dc1804ae2d24 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/KubernetesRestProtocolMessages.scala @@ -16,9 +16,14 @@ */ package org.apache.spark.deploy.rest +import java.io.File + import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} +import com.google.common.io.Files +import org.apache.commons.codec.binary.Base64 import org.apache.spark.SPARK_VERSION +import org.apache.spark.util.Utils case class KubernetesCreateSubmissionRequest( appResource: AppResource, @@ -63,3 +68,21 @@ class PingResponse extends SubmitRestProtocolResponse { serverSparkVersion = SPARK_VERSION } +object AppResource { + def assemble(appResource: String): AppResource = { + val appResourceUri = Utils.resolveURI(appResource) + appResourceUri.getScheme match { + case "file" | null => + val appFile = new File(appResourceUri.getPath) + if (!appFile.isFile) { + throw new IllegalStateException("Provided local file path does not exist" + + s" or is not a file: ${appFile.getAbsolutePath}") + } + val fileBytes = Files.toByteArray(appFile) + val fileBase64 = Base64.encodeBase64String(fileBytes) + UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName) + case "container" | "local" => ContainerAppResource(appResourceUri.getPath) + case other => RemoteAppResource(other) + } + } +} \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala index c5a7e27b1592..4270af26595a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/KubernetesSparkRestServer.scala @@ -158,17 +158,21 @@ private[spark] class KubernetesSparkRestServer( handleError("Unauthorized to submit application.") } else { val tempDir = Utils.createTempDir() - val appResourcePath = resolvedAppResource(appResource, tempDir) + val appResourcePath = resolveAppResourceToLocalPath(appResource, tempDir) val writtenJars = writeUploadedJars(uploadedJars, tempDir) val writtenFiles = writeUploadedFiles(uploadedFiles) val resolvedSparkProperties = new mutable.HashMap[String, String] resolvedSparkProperties ++= sparkProperties // Resolve driver classpath and jars - val originalJars = sparkProperties.get("spark.jars") + val nonUploadedJars = sparkProperties.get("spark.jars") .map(_.split(",")) + .map(_.filter(Utils.resolveURI(_).getScheme match { + case "file" | null => false + case _ => true + })) .getOrElse(Array.empty[String]) - val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath) + val resolvedJars = writtenJars ++ nonUploadedJars ++ Array(appResourcePath) val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath) val driverExtraClasspath = sparkProperties .get("spark.driver.extraClassPath") @@ -180,10 +184,14 @@ private[spark] class KubernetesSparkRestServer( resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",") // Resolve spark.files - val originalFiles = sparkProperties.get("spark.files") + val nonUploadedFiles = sparkProperties.get("spark.files") .map(_.split(",")) + .map(_.filter(Utils.resolveURI(_).getScheme match { + case "file" | null => false + case _ => true + })) .getOrElse(Array.empty[String]) - val resolvedFiles = originalFiles ++ writtenFiles + val resolvedFiles = nonUploadedFiles ++ writtenFiles resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",") val command = new ArrayBuffer[String] @@ -250,7 +258,7 @@ private[spark] class KubernetesSparkRestServer( writeBase64ContentsToFiles(files, workingDir) } - def resolvedAppResource(appResource: AppResource, tempDir: File): String = { + def resolveAppResourceToLocalPath(appResource: AppResource, tempDir: File): String = { val appResourcePath = appResource match { case UploadedAppResource(resourceContentsBase64, resourceName) => val resourceFile = new File(tempDir, resourceName)