Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 78 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.net.URI
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import javax.ws.rs.core.UriBuilder

import scala.collection.JavaConverters._
import scala.collection.Map
Expand All @@ -39,7 +40,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource}
Expand Down Expand Up @@ -221,6 +222,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _listenerBusStarted: Boolean = false
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _archives: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _heartbeater: Heartbeater = _
Expand All @@ -246,6 +248,7 @@ class SparkContext(config: SparkConf) extends Logging {

def jars: Seq[String] = _jars
def files: Seq[String] = _files
def archives: Seq[String] = _archives
def master: String = _conf.get("spark.master")
def deployMode: String = _conf.get(SUBMIT_DEPLOY_MODE)
def appName: String = _conf.get("spark.app.name")
Expand Down Expand Up @@ -278,6 +281,7 @@ class SparkContext(config: SparkConf) extends Logging {

// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedArchives = new ConcurrentHashMap[String, Long]().asScala
private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala

// Keeps track of all persisted RDDs
Expand Down Expand Up @@ -422,6 +426,7 @@ class SparkContext(config: SparkConf) extends Logging {
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten
_archives = _conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten

_eventLogDir =
if (isEventLogEnabled) {
Expand Down Expand Up @@ -506,6 +511,13 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

if (archives != null) {
archives.foreach(file => addFile(file, false, true, isArchive = true))
if (addedArchives.nonEmpty) {
_conf.set("spark.app.initial.archive.urls", addedArchives.keys.toSeq.mkString(","))
}
}

_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
Expand Down Expand Up @@ -1520,6 +1532,36 @@ class SparkContext(config: SparkConf) extends Logging {
*/
def listFiles(): Seq[String] = addedFiles.keySet.toSeq

/**
* :: Experimental ::
* Add an archive to be downloaded and unpacked with this Spark job on every node.
*
* If an archive is added during execution, it will not be available until the next TaskSet
* starts.
*
* @param path can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(paths-to-files)` to find its download/unpacked location.
* The given path should be one of .zip, .tar, .tar.gz, .tgz and .jar.
*
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*
* @since 3.1.0
*/
@Experimental
def addArchive(path: String): Unit = {
addFile(path, false, false, isArchive = true)
}

/**
* :: Experimental ::
* Returns a list of archive paths that are added to resources.
*
* @since 3.1.0
*/
@Experimental
def listArchives(): Seq[String] = addedArchives.keySet.toSeq

/**
* Add a file to be downloaded with this Spark job on every node.
*
Expand All @@ -1537,8 +1579,14 @@ class SparkContext(config: SparkConf) extends Logging {
addFile(path, recursive, false)
}

private def addFile(path: String, recursive: Boolean, addedOnSubmit: Boolean): Unit = {
val uri = new Path(path).toUri
private def addFile(
path: String, recursive: Boolean, addedOnSubmit: Boolean, isArchive: Boolean = false
): Unit = {
val uri = if (!isArchive) {
new Path(path).toUri
} else {
Utils.resolveURI(path)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we cannot rely on new Path(path).toUri. it makes the fragment (#) in URI as the part of path. Utils.resolveURI is used for spark.yarn.dist.archives as well.

}
val schemeCorrectedURI = uri.getScheme match {
case null => new File(path).getCanonicalFile.toURI
case "local" =>
Expand All @@ -1550,7 +1598,7 @@ class SparkContext(config: SparkConf) extends Logging {

val hadoopPath = new Path(schemeCorrectedURI)
val scheme = schemeCorrectedURI.getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
if (!Array("http", "https", "ftp").contains(scheme) && !isArchive) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Archive is not supposed to be a directory.

val fs = hadoopPath.getFileSystem(hadoopConfiguration)
val isDir = fs.getFileStatus(hadoopPath).isDirectory
if (!isLocal && scheme == "file" && isDir) {
Expand All @@ -1568,21 +1616,39 @@ class SparkContext(config: SparkConf) extends Logging {

val key = if (!isLocal && scheme == "file") {
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
} else if (uri.getScheme == null) {
schemeCorrectedURI.toString
} else if (isArchive) {
uri.toString
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the same reason of keeping the fragment, it uses URI when it's archive.

} else {
if (uri.getScheme == null) {
schemeCorrectedURI.toString
} else {
path
}
path
}

val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
if (!isArchive && addedFiles.putIfAbsent(key, timestamp).isEmpty) {
logInfo(s"Added file $path at $key with timestamp $timestamp")
// Fetch the file locally so that closures which are run on the driver can still use the
// SparkFiles API to access files.
Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false)
postEnvironmentUpdate()
} else if (
isArchive &&
addedArchives.putIfAbsent(
UriBuilder.fromUri(new URI(key)).fragment(uri.getFragment).build().toString,
timestamp).isEmpty) {
logInfo(s"Added archive $path at $key with timestamp $timestamp")
val uriToDownload = UriBuilder.fromUri(new URI(key)).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (uri.getFragment != null) uri.getFragment else source.getName)
logInfo(
s"Unpacking an archive $path from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
postEnvironmentUpdate()
} else {
logWarning(s"The path $path has been added already. Overwriting of added paths " +
"is not supported in the current version.")
Expand Down Expand Up @@ -2494,8 +2560,9 @@ class SparkContext(config: SparkConf) extends Logging {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
val addedArchivePaths = addedArchives.keys.toSeq
val environmentDetails = SparkEnv.environmentDetails(conf, hadoopConfiguration,
schedulingMode, addedJarPaths, addedFilePaths)
schedulingMode, addedJarPaths, addedFilePaths, addedArchivePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ object SparkEnv extends Logging {
hadoopConf: Configuration,
schedulingMode: String,
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
addedFiles: Seq[String],
addedArchives: Seq[String]): Map[String, Seq[(String, String)]] = {

import Properties._
val jvmInformation = Seq(
Expand Down Expand Up @@ -484,7 +485,7 @@ object SparkEnv extends Logging {
.split(File.pathSeparator)
.filterNot(_.isEmpty)
.map((_, "System Classpath"))
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val addedJarsAndFiles = (addedJars ++ addedFiles ++ addedArchives).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted

// Add Hadoop properties, it will not ignore configs including in Spark. Some spark
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,8 @@ private[spark] class SparkSubmit extends Logging {
confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = FILES.key),
OptionAssigner(args.archives, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = ARCHIVES.key),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
confKey = JARS.key),
Expand Down Expand Up @@ -796,6 +798,7 @@ private[spark] class SparkSubmit extends Logging {
val pathConfigs = Seq(
JARS.key,
FILES.key,
ARCHIVES.key,
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get(config.JARS.key)).orNull
files = Option(files).orElse(sparkProperties.get(config.FILES.key)).orNull
archives = Option(archives).orElse(sparkProperties.get(config.ARCHIVES.key)).orNull
pyFiles = Option(pyFiles).orElse(sparkProperties.get(config.SUBMIT_PYTHON_FILES.key)).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
ivySettingsPath = sparkProperties.get("spark.jars.ivySettings")
Expand Down Expand Up @@ -512,6 +513,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor. File paths of these files
| in executors can be accessed via SparkFiles.get(fileName).
| --archives ARCHIVES Comma-separated list of archives to be extracted into the
| working directory of each executor.
|
| --conf, -c PROP=VALUE Arbitrary Spark configuration property.
| --properties-file FILE Path to a file from which to load extra properties. If not
Expand Down Expand Up @@ -562,8 +565,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
|
| Spark on YARN only:
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
""".stripMargin
)

Expand Down
50 changes: 36 additions & 14 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import java.util.{Locale, Properties}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
import javax.ws.rs.core.UriBuilder

import scala.collection.JavaConverters._
import scala.collection.immutable
Expand Down Expand Up @@ -78,6 +79,7 @@ private[spark] class Executor(
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
private val currentArchives: HashMap[String, Long] = new HashMap[String, Long]()

private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))

Expand Down Expand Up @@ -230,16 +232,17 @@ private[spark] class Executor(
private val appStartTime = conf.getLong("spark.app.startTime", 0)

// To allow users to distribute plugins and their required files
// specified by --jars and --files on application submission, those jars/files should be
// downloaded and added to the class loader via updateDependencies.
// This should be done before plugin initialization below
// specified by --jars, --files and --archives on application submission, those
// jars/files/archives should be downloaded and added to the class loader via
// updateDependencies. This should be done before plugin initialization below
// because executors search plugins from the class loader and initialize them.
private val Seq(initialUserJars, initialUserFiles) = Seq("jar", "file").map { key =>
conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
Map(urls.split(",").map(url => (url, appStartTime)): _*)
}.getOrElse(Map.empty)
}
updateDependencies(initialUserFiles, initialUserJars)
private val Seq(initialUserJars, initialUserFiles, initialUserArchives) =
Seq("jar", "file", "archive").map { key =>
conf.getOption(s"spark.app.initial.$key.urls").map { urls =>
Map(urls.split(",").map(url => (url, appStartTime)): _*)
}.getOrElse(Map.empty)
}
updateDependencies(initialUserFiles, initialUserJars, initialUserArchives)

// Plugins need to load using a class loader that includes the executor's user classpath.
// Plugins also needs to be initialized after the heartbeater started
Expand Down Expand Up @@ -447,7 +450,8 @@ private[spark] class Executor(
// requires access to properties contained within (e.g. for access control).
Executor.taskDeserializationProps.set(taskDescription.properties)

updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)
updateDependencies(
taskDescription.addedFiles, taskDescription.addedJars, taskDescription.addedArchives)
task = ser.deserialize[Task[Any]](
taskDescription.serializedTask, Thread.currentThread.getContextClassLoader)
task.localProperties = taskDescription.properties
Expand Down Expand Up @@ -907,32 +911,50 @@ private[spark] class Executor(
* Download any missing dependencies if we receive a new set of files and JARs from the
* SparkContext. Also adds any new JARs we fetched to the class loader.
*/
private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]): Unit = {
private def updateDependencies(
newFiles: Map[String, Long],
newJars: Map[String, Long],
newArchives: Map[String, Long]): Unit = {
lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
synchronized {
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, -1L) < timestamp) {
logInfo(s"Fetching $name with timestamp $timestamp")
val sourceURI = new URI(name)
val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build()
val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false)
val dest = new File(
SparkFiles.getRootDirectory(),
if (sourceURI.getFragment != null) sourceURI.getFragment else source.getName)
logInfo(
s"Unpacking an archive $name from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
Utils.deleteRecursively(dest)
Utils.unpack(source, dest)
currentArchives(name) = timestamp
}
for ((name, timestamp) <- newJars) {
val localName = new URI(name).getPath.split("/").last
val currentTimeStamp = currentJars.get(name)
.orElse(currentJars.get(localName))
.getOrElse(-1L)
if (currentTimeStamp < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
logInfo(s"Fetching $name with timestamp $timestamp")
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentJars(name) = timestamp
// Add it to our class loader
val url = new File(SparkFiles.getRootDirectory(), localName).toURI.toURL
if (!urlClassLoader.getURLs().contains(url)) {
logInfo("Adding " + url + " to class loader")
logInfo(s"Adding $url to class loader")
urlClassLoader.addURL(url)
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,16 @@ package object config {
.toSequence
.createWithDefault(Nil)

private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
.version("3.1.0")
.doc("Comma-separated list of archives to be extracted into the working directory of each " +
"executor. .jar, .tar.gz, .tgz and .zip are supported. You can specify the directory " +
"name to unpack via adding '#' after the file name to unpack, for example, " +
"'file.zip#directory'. This configuration is experimental.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val SUBMIT_DEPLOY_MODE = ConfigBuilder("spark.submit.deployMode")
.version("1.5.0")
.stringConf
Expand Down
Loading