diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 67a5c95400e53..fe8ad2b2d012b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -322,7 +322,7 @@ private[spark] class Client( destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) val destPath = copyFileToRemote(dst, localPath, replication) distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache, + hadoopConf, destPath, localResources, resType, linkname, statCache, appMasterOnly = appMasterOnly) (false, linkname) } else { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 3d3a966960e9f..ce83e5b26817c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -46,7 +46,6 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * Adds the LocalResource to the localResources HashMap passed in and saves * the stats of the resources to they can be sent to the executors and verified. * - * @param fs FileSystem * @param conf Configuration * @param destPath path to the resource * @param localResources localResource hashMap to insert the resource into @@ -56,7 +55,6 @@ private[spark] class ClientDistributedCacheManager() extends Logging { * @param appMasterOnly Whether to only add the resource to the app master */ def addResource( - fs: FileSystem, conf: Configuration, destPath: Path, localResources: HashMap[String, LocalResource], @@ -64,6 +62,7 @@ private[spark] class ClientDistributedCacheManager() extends Logging { link: String, statCache: Map[URI, FileStatus], appMasterOnly: Boolean = false): Unit = { + val fs = FileSystem.get(destPath.toUri(), conf) val destStatus = fs.getFileStatus(destPath) val amJarRsrc = Records.newRecord(classOf[LocalResource]) amJarRsrc.setType(resourceType) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index 804dfecde7867..635bed4441883 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -78,7 +78,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", + distMgr.addResource(conf, destPath, localResources, LocalResourceType.FILE, "link", statCache, false) val resource = localResources("link") assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) @@ -105,7 +105,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar null, new Path("/tmp/testing2")) val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2") when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus) - distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", + distMgr.addResource(conf, destPath2, localResources, LocalResourceType.FILE, "link2", statCache, false) val resource2 = localResources("link2") assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE) @@ -141,7 +141,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) intercept[Exception] { - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, + distMgr.addResource(conf, destPath, localResources, LocalResourceType.FILE, null, statCache, false) } assert(localResources.get("link") === None) @@ -159,7 +159,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar null, new Path("/tmp/testing")) when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", + distMgr.addResource(conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", statCache, true) val resource = localResources("link") assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) @@ -193,7 +193,7 @@ class ClientDistributedCacheManagerSuite extends SparkFunSuite with MockitoSugar null, new Path("/tmp/testing")) when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) - distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", + distMgr.addResource(conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", statCache, false) val resource = localResources("link") assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE)