Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ private[spark] class Client(
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
distCacheMgr.addResource(
fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
hadoopConf, destPath, localResources, resType, linkname, statCache,
appMasterOnly = appMasterOnly)
(false, linkname)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* Adds the LocalResource to the localResources HashMap passed in and saves
* the stats of the resources to they can be sent to the executors and verified.
*
* @param fs FileSystem
* @param conf Configuration
* @param destPath path to the resource
* @param localResources localResource hashMap to insert the resource into
Expand All @@ -56,14 +55,14 @@ private[spark] class ClientDistributedCacheManager() extends Logging {
* @param appMasterOnly Whether to only add the resource to the app master
*/
def addResource(
fs: FileSystem,
conf: Configuration,
destPath: Path,
localResources: HashMap[String, LocalResource],
resourceType: LocalResourceType,
link: String,
statCache: Map[URI, FileStatus],
appMasterOnly: Boolean = false): Unit = {
val fs = FileSystem.get(destPath.toUri(), conf)
val destStatus = fs.getFileStatus(destPath)
val amJarRsrc = Records.newRecord(classOf[LocalResource])
amJarRsrc.setType(resourceType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())

distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link",
distMgr.addResource(conf, destPath, localResources, LocalResourceType.FILE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
Expand All @@ -105,7 +105,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
null, new Path("/tmp/testing2"))
val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2")
when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus)
distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2",
distMgr.addResource(conf, destPath2, localResources, LocalResourceType.FILE, "link2",
statCache, false)
val resource2 = localResources("link2")
assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE)
Expand Down Expand Up @@ -141,7 +141,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
when(fs.getFileStatus(destPath)).thenReturn(new FileStatus())

intercept[Exception] {
distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null,
distMgr.addResource(conf, destPath, localResources, LocalResourceType.FILE, null,
statCache, false)
}
assert(localResources.get("link") === None)
Expand All @@ -159,7 +159,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)

distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
distMgr.addResource(conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, true)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
Expand Down Expand Up @@ -193,7 +193,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar
null, new Path("/tmp/testing"))
when(fs.getFileStatus(destPath)).thenReturn(realFileStatus)

distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
distMgr.addResource(conf, destPath, localResources, LocalResourceType.ARCHIVE, "link",
statCache, false)
val resource = localResources("link")
assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)
Expand Down