Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,21 @@ import java.util
import java.util.concurrent.{CountDownLatch, TimeUnit}
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

import scala.collection.JavaConverters._
import scala.collection.mutable

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SecurityManager, SparkConf, SparkException, SSLOptions}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, TarGzippedData}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand All @@ -59,9 +60,9 @@ private[spark] class Client(
private val sslSecretsDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId-ssl"
private val sslSecretsName = s"$SUBMISSION_SSL_SECRETS_PREFIX-$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uploadedJars = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_JARS).filter(_.nonEmpty)
private val uploadedFiles = sparkConf.get(KUBERNETES_DRIVER_UPLOAD_FILES).filter(_.nonEmpty)
uploadedFiles.foreach(validateNoDuplicateUploadFileNames)
private val sparkJars = sparkConf.getOption("spark.jars")
private val sparkFiles = sparkConf.getOption("spark.files")
sparkFiles.foreach(validateNoDuplicateFileNames)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)

Expand All @@ -79,7 +80,7 @@ private[spark] class Client(
def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")

Seq(uploadedFiles, uploadedJars, Some(mainAppResource)).foreach(checkForFilesExistence)
Seq(sparkJars, sparkFiles, Some(mainAppResource)).foreach(checkForFilesExistence)

val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()
val parsedCustomLabels = parseCustomLabels(customLabels)
Expand Down Expand Up @@ -674,22 +675,9 @@ private[spark] class Client(
}

private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = {
val appResourceUri = Utils.resolveURI(mainAppResource)
val resolvedAppResource: AppResource = appResourceUri.getScheme match {
case "file" | null =>
val appFile = new File(appResourceUri.getPath)
if (!appFile.isFile) {
throw new IllegalStateException("Provided local file path does not exist" +
s" or is not a file: ${appFile.getAbsolutePath}")
}
val fileBytes = Files.toByteArray(appFile)
val fileBase64 = Base64.encodeBase64String(fileBytes)
UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName)
case "container" => ContainerAppResource(appResourceUri.getPath)
case other => RemoteAppResource(other)
}
val uploadJarsBase64Contents = compressFiles(uploadedJars)
val uploadFilesBase64Contents = compressFiles(uploadedFiles)
val resolvedAppResource: AppResource = AppResource.assemble(mainAppResource)
val uploadJarsBase64Contents = compressUploadableFiles(sparkJars)
val uploadFilesBase64Contents = compressUploadableFiles(sparkFiles)
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
Expand All @@ -700,11 +688,10 @@ private[spark] class Client(
uploadedFilesBase64Contents = uploadFilesBase64Contents)
}

// Because uploaded files should be added to the working directory of the driver, they
// need to not have duplicate file names. They are added to the working directory so the
// user can reliably locate them in their application. This is similar in principle to how
// YARN handles its `spark.files` setting.
private def validateNoDuplicateUploadFileNames(uploadedFilesCommaSeparated: String): Unit = {
// Because files should be added to the working directory of the driver, they need to not have
// duplicate file names. They are added to the working directory so the user can reliably
// locate them in their application. This is similar to how YARN handles `spark.files` too.
private def validateNoDuplicateFileNames(uploadedFilesCommaSeparated: String): Unit = {
val pathsWithDuplicateNames = uploadedFilesCommaSeparated
.split(",")
.groupBy(new File(_).getName)
Expand All @@ -715,15 +702,21 @@ private[spark] class Client(
.flatten
.toList
.sortBy(new File(_).getName)
throw new SparkException("Cannot upload files with duplicate names via" +
s" ${KUBERNETES_DRIVER_UPLOAD_FILES.key}. The following paths have a duplicated" +
s" file name: ${pathsWithDuplicateNamesSorted.mkString(",")}")
throw new SparkException("Cannot support files with duplicate names in spark.files because " +
"the files are placed into the working directory of the driver and executors. The " +
"following paths have a duplicated file name:" +
pathsWithDuplicateNamesSorted.mkString(","))
}
}

private def compressFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
private def compressUploadableFiles(maybeFilePaths: Option[String]): Option[TarGzippedData] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure if it's worth keeping TarGzippedData fields as Options anymore. Sure, we save a few bytes of extra data to upload if there aren't any jars, but it's an extra layer of indirection that has to be semantically understood.

maybeFilePaths
.map(_.split(","))
.map(_.filter(Utils.resolveURI(_).getScheme match {
// only local files that need to be uploaded
case "file" | null => true
case _ => false
}))
.map(CompressionUtils.createTarGzip(_))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,6 @@ package object config {
.stringConf
.createWithDefault("default")

private[spark] val KUBERNETES_DRIVER_UPLOAD_JARS =
ConfigBuilder("spark.kubernetes.driver.uploads.jars")
.doc("""
| Comma-separated list of jars to send to the driver and
| all executors when submitting the application in cluster
| mode.
""".stripMargin)
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_UPLOAD_FILES =
ConfigBuilder("spark.kubernetes.driver.uploads.files")
.doc("""
| Comma-separated list of files to send to the driver and
| all executors when submitting the application in cluster
| mode. The files are added in a flat hierarchy to the
| current working directory of the driver, having the same
| names as the names of the original files. Note that two
| files with the same name cannot be added, even if they
| were in different source directories on the client disk.
""".stripMargin)
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
*/
package org.apache.spark.deploy.rest

import java.io.File

import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo}
import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64

import org.apache.spark.SPARK_VERSION
import org.apache.spark.util.Utils

case class KubernetesCreateSubmissionRequest(
appResource: AppResource,
Expand Down Expand Up @@ -63,3 +68,21 @@ class PingResponse extends SubmitRestProtocolResponse {
serverSparkVersion = SPARK_VERSION
}

object AppResource {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark the object as private[spark]

def assemble(appResource: String): AppResource = {
val appResourceUri = Utils.resolveURI(appResource)
appResourceUri.getScheme match {
case "file" | null =>
val appFile = new File(appResourceUri.getPath)
if (!appFile.isFile) {
throw new IllegalStateException("Provided local file path does not exist" +
s" or is not a file: ${appFile.getAbsolutePath}")
}
val fileBytes = Files.toByteArray(appFile)
val fileBase64 = Base64.encodeBase64String(fileBytes)
UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName)
case "container" | "local" => ContainerAppResource(appResourceUri.getPath)
case other => RemoteAppResource(other)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,21 @@ private[spark] class KubernetesSparkRestServer(
handleError("Unauthorized to submit application.")
} else {
val tempDir = Utils.createTempDir()
val appResourcePath = resolvedAppResource(appResource, tempDir)
val appResourcePath = resolveAppResourceToLocalPath(appResource, tempDir)
val writtenJars = writeUploadedJars(uploadedJars, tempDir)
val writtenFiles = writeUploadedFiles(uploadedFiles)
val resolvedSparkProperties = new mutable.HashMap[String, String]
resolvedSparkProperties ++= sparkProperties

// Resolve driver classpath and jars
val originalJars = sparkProperties.get("spark.jars")
val nonUploadedJars = sparkProperties.get("spark.jars")
.map(_.split(","))
.map(_.filter(Utils.resolveURI(_).getScheme match {
case "file" | null => false
case _ => true
}))
.getOrElse(Array.empty[String])
val resolvedJars = writtenJars ++ originalJars ++ Array(appResourcePath)
val resolvedJars = writtenJars ++ nonUploadedJars ++ Array(appResourcePath)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we're missing adding jars with the scheme local to the driver's classpath. Note that while we want to add said jars to the classpath using just the raw path, they should still be put in spark.jars with the full URI so that executors pick them up from their local disks, as opposed to having the driver upload them.

val sparkJars = new File(sparkHome, "jars").listFiles().map(_.getAbsolutePath)
val driverExtraClasspath = sparkProperties
.get("spark.driver.extraClassPath")
Expand All @@ -180,10 +184,14 @@ private[spark] class KubernetesSparkRestServer(
resolvedSparkProperties("spark.jars") = resolvedJars.mkString(",")

// Resolve spark.files
val originalFiles = sparkProperties.get("spark.files")
val nonUploadedFiles = sparkProperties.get("spark.files")
.map(_.split(","))
.map(_.filter(Utils.resolveURI(_).getScheme match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a common enough pattern that we could think about separating this out to a separate class.

Also, some other places in core Spark have used this paradigm:

Option(Utils.resolveURI(file).getScheme).getOrElse("file") match ...

Copy link

@mccheah mccheah Feb 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the above we can shorthand a lot of things and avoid the case-match entirely. For example:

Option(Utils.resolveURI(file).getScheme).getOrElse("file") != "file"

case "file" | null => false
case _ => true
}))
.getOrElse(Array.empty[String])
val resolvedFiles = originalFiles ++ writtenFiles
val resolvedFiles = nonUploadedFiles ++ writtenFiles
resolvedSparkProperties("spark.files") = resolvedFiles.mkString(",")

val command = new ArrayBuffer[String]
Expand Down Expand Up @@ -250,7 +258,7 @@ private[spark] class KubernetesSparkRestServer(
writeBase64ContentsToFiles(files, workingDir)
}

def resolvedAppResource(appResource: AppResource, tempDir: File): String = {
def resolveAppResourceToLocalPath(appResource: AppResource, tempDir: File): String = {
val appResourcePath = appResource match {
case UploadedAppResource(resourceContentsBase64, resourceName) =>
val resourceFile = new File(tempDir, resourceName)
Expand Down