Skip to content
Closed
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -805,11 +805,12 @@ class SparkContext(config: SparkConf) extends Logging {
case "local" => "file:" + uri.getPath
case _ => path
}
addedFiles(key) = System.currentTimeMillis
val timestamp = System.currentTimeMillis
addedFiles(key) = timestamp

// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
hadoopConfiguration)
hadoopConfiguration, timestamp, useCache = false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics of the cache here? Why can't we use the same code path (i.e. "true") in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for DAGScheduler.runLocally(). I think it needn't use cache because it only run in driver's jvm container once.


logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
postEnvironmentUpdate()
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,14 +322,16 @@ private[spark] class Executor(
// Fetch missing dependencies
for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentFiles(name) = timestamp
}
for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
logInfo("Fetching " + name + " with timestamp " + timestamp)
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf, env.securityManager,
hadoopConf)
// Fetch file with useCache mode, close cache for local mode.
Utils.fetchFile(name, new File(SparkFiles.getRootDirectory), conf,
env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
currentJars(name) = timestamp
// Add it to our class loader
val localName = name.split("/").last
Expand Down
87 changes: 73 additions & 14 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -313,15 +313,84 @@ private[spark] object Utils extends Logging {
}

/**
* Download a file requested by the executor. Supports fetching the file in a variety of ways,
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* If `useCache` is true, first attempts to fetch the file to a local cache that's shared
* across executors running the same application. `useCache` is used mainly for
* the executors, and not in local mode.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
def fetchFile(url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager,
hadoopConf: Configuration) {
val filename = url.split("/").last
def fetchFile(
url: String,
targetDir: File,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration,
timestamp: Long,
useCache: Boolean) {
val fileName = url.split("/").last
val targetFile = new File(targetDir, fileName)
if (useCache) {
val cachedFileName = s"${url.hashCode}${timestamp}_cache"
val lockFileName = s"${url.hashCode}${timestamp}_lock"
val localDir = new File(getLocalDir(conf))
val lockFile = new File(localDir, lockFileName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a lock file? This seems a little expensive

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea here is that multiple executors JVMs are running on the same machine and we only want to download one copy of the file to the shared cache, so we use a lock file as a form of interprocess synchronization.

val raf = new RandomAccessFile(lockFile, "rw")
// Only one executor entry.
// The FileLock is only used to control synchronization for executors download file,
// it's always safe regardless of lock type (mandatory or advisory).
val lock = raf.getChannel().lock()
val cachedFile = new File(localDir, cachedFileName)
try {
if (!cachedFile.exists()) {
doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)
}
} finally {
lock.release()
}
if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you check whether targetFile exists before you do fetch the file? I thought the whole point of this PR is that all executors on the same node use the same jars, and only the first executor to call fetchFile actually fetches the jars, so if the jar already exists then we shouldn't fetch it again just to compare it with the target file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fix this, however, we will have to assume that the resources we fetch will not change over time. This may or may not be true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These codes just follow https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L354
I think we need fetch the file firstly then compare cachedFile's constant and targetFile's constant.(!Files.equal(cachedFile, targetFile)) because we can't sure that the file's constant is not changed while timestamp is changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So even with this PR each executor still fetches the jars. I thought the whole point of this PR is to avoid that. Is that not the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR try to avoid network blocking (multiple executors download file by network to local storage only once). So each executor still fetch files to their work directory, but only from local storage ,not from remote node by network.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the cached file is shared across executors, so the doFetchFile in L348 should only be called once on each node for all executors.

if (conf.getBoolean("spark.files.overwrite", false)) {
targetFile.delete()
logInfo((s"File $targetFile exists and does not match contents of $url, " +
s"replacing it with $url"))
} else {
throw new SparkException(s"File $targetFile exists and does not match contents of $url")
}
}
Files.copy(cachedFile, targetFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about avoiding the copy and also the redundant decompression below by always using the cached location if it exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK, but now executor use these files as they are in their work directory ./. Maybe we can optimize to avoid the copy in next patch if we prove this patch work well.

} else {
doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
}

// Decompress the file if it's a .tar or .tar.gz
if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
} else if (fileName.endsWith(".tar")) {
logInfo("Untarring " + fileName)
Utils.execute(Seq("tar", "-xf", fileName), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this chunk of code is moved here? If the first executor untars it already, then the second executor doesn't need to do it again right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is related to the discussion above about using the cache to avoid the network fetch, but still copying the file from the shared cache into the executor's work directory. In that case, I think we need to perform the decompression on the file in the work directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok. I guess that's faster than copying the whole untarred directory.

}

/**
* Download a file to target directory. Supports fetching the file in a variety of ways,
* including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
*
* Throws SparkException if the target file already exists and has different contents than
* the requested file.
*/
private def doFetchFile(
url: String,
targetDir: File,
filename: String,
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
Expand Down Expand Up @@ -409,16 +478,6 @@ private[spark] object Utils extends Logging {
}
Files.move(tempFile, targetFile)
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
logInfo("Untarring " + filename)
Utils.execute(Seq("tar", "-xzf", filename), targetDir)
} else if (filename.endsWith(".tar")) {
logInfo("Untarring " + filename)
Utils.execute(Seq("tar", "-xf", filename), targetDir)
}
// Make the file executable - That's necessary for scripts
FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
}

/**
Expand Down