diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 7d49f10d44823..e686e079d1a19 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -700,8 +700,6 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0")),
- "spark.yarn.access.hadoopFileSystems" -> Seq(
- AlternateConfig("spark.yarn.access.namenodes", "2.2")),
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
@@ -715,7 +713,10 @@ private[spark] object SparkConf extends Logging {
PRINCIPAL.key -> Seq(
AlternateConfig("spark.yarn.principal", "3.0")),
KERBEROS_RELOGIN_PERIOD.key -> Seq(
- AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
+ AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
+ KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
+ AlternateConfig("spark.yarn.access.namenodes", "2.2"),
+ AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
)
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
index e345b0bbffeae..d53eb4efe529b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala
@@ -39,7 +39,6 @@ private[security] class HBaseDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
- fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 487291e176059..6a18a8dd33d1a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -144,7 +144,7 @@ private[spark] class HadoopDelegationTokenManager(
def obtainDelegationTokens(creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
- provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds)
+ provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
@@ -181,14 +181,6 @@ private[spark] class HadoopDelegationTokenManager(
.getOrElse(isEnabledDeprecated)
}
- /**
- * List of file systems for which to obtain delegation tokens. The base implementation
- * returns just the default file system in the given Hadoop configuration.
- */
- protected def fileSystemsToAccess(): Set[FileSystem] = {
- Set(FileSystem.get(hadoopConf))
- }
-
private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
index cb4c97bdc4c1a..3dc952d54e730 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
@@ -44,13 +44,11 @@ private[spark] trait HadoopDelegationTokenProvider {
* Obtain delegation tokens for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param creds Credentials to add tokens and security keys to.
- * @param fileSystems List of file systems for which to obtain delegation tokens.
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
- fileSystems: Set[FileSystem],
creds: Credentials): Option[Long]
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
index 3bc40de776ec9..725eefbda897b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
@@ -22,7 +22,7 @@ import scala.util.Try
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
@@ -44,9 +44,9 @@ private[deploy] class HadoopFSDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
- fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
+ val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)
// Get the token renewal interval if it is not set. It will only be called once.
@@ -133,3 +133,44 @@ private[deploy] class HadoopFSDelegationTokenProvider
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}
+
+private[deploy] object HadoopFSDelegationTokenProvider {
+ def hadoopFSsToAccess(
+ sparkConf: SparkConf,
+ hadoopConf: Configuration): Set[FileSystem] = {
+ val filesystemsToAccess = sparkConf.get(KERBEROS_FILESYSTEMS_TO_ACCESS)
+
+ val defaultFS = FileSystem.get(hadoopConf)
+ val master = sparkConf.get("spark.master", null)
+ val stagingFS = if (master != null && master.contains("yarn")) {
+ sparkConf.get(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf))
+ } else {
+ None
+ }
+
+ // Add the list of available namenodes for all namespaces in HDFS federation.
+ // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
+ // namespaces.
+ val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || defaultFS.getScheme == "viewfs" ||
+ (stagingFS.isDefined && stagingFS.get.getScheme == "viewfs")) {
+ filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
+ } else {
+ val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
+ // Retrieving the filesystem for the nameservices where HA is not enabled
+ val filesystemsWithoutHA = nameservices.flatMap { ns =>
+ Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
+ new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
+ }
+ }
+ // Retrieving the filesystem for the nameservices where HA is enabled
+ val filesystemsWithHA = nameservices.flatMap { ns =>
+ Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
+ new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
+ }
+ }
+ (filesystemsWithoutHA ++ filesystemsWithHA).toSet
+ }
+
+ hadoopFilesystems ++ stagingFS + defaultFS
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 66b1ff76446aa..3ef6cba8193f6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -364,6 +364,14 @@ package object config {
.checkValues(Set("keytab", "ccache"))
.createWithDefault("keytab")
+ private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS =
+ ConfigBuilder("spark.kerberos.access.hadoopFileSystems")
+ .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
+ "that hosts fs.defaultFS does not need to be listed here.")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
@@ -1253,4 +1261,9 @@ package object config {
ConfigBuilder("spark.speculation.quantile")
.doubleConf
.createWithDefault(0.75)
+
+ private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
+ .doc("Staging directory used while submitting applications.")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 079170d47d0b2..c187bcb577016 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -264,6 +264,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)
+
+ conf.set("spark.yarn.access.namenodes", "testNode")
+ assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
+
+ conf.set("spark.yarn.access.hadoopFileSystems", "testNode")
+ assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
}
test("akka deprecated configs") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index 9cc4a914b90e4..2f36dba05c64e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.deploy.security
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.spark.{SparkConf, SparkFunSuite}
@@ -36,7 +35,6 @@ private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationT
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
- fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = throw new IllegalArgumentException
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala
new file mode 100644
index 0000000000000..0f1a9168e2542
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.security
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.scalatest.Matchers
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.STAGING_DIR
+
+class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers {
+ test("hadoopFSsToAccess should return defaultFS even if not configured") {
+ val sparkConf = new SparkConf()
+ val defaultFS = "hdfs://localhost:8020"
+ val statingDir = "hdfs://localhost:8021"
+ sparkConf.set("spark.master", "yarn-client")
+ sparkConf.set(STAGING_DIR, statingDir)
+ val hadoopConf = new Configuration()
+ hadoopConf.set("fs.defaultFS", defaultFS)
+ val expected = Set(
+ new Path(defaultFS).getFileSystem(hadoopConf),
+ new Path(statingDir).getFileSystem(hadoopConf)
+ )
+ val result = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
+ result should be (expected)
+ }
+
+ test("SPARK-24149: retrieve all namenodes from HDFS") {
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.master", "yarn-client")
+ val basicFederationConf = new Configuration()
+ basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
+ basicFederationConf.set("dfs.nameservices", "ns1,ns2")
+ basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
+ basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
+ val basicFederationExpected = Set(
+ new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
+ new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
+ val basicFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
+ sparkConf, basicFederationConf)
+ basicFederationResult should be (basicFederationExpected)
+
+ // when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them
+ val viewFsConf = new Configuration()
+ viewFsConf.addResource(basicFederationConf)
+ viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
+ viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
+ val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
+ HadoopFSDelegationTokenProvider
+ .hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)
+
+ // invalid config should not throw NullPointerException
+ val invalidFederationConf = new Configuration()
+ invalidFederationConf.addResource(basicFederationConf)
+ invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
+ val invalidFederationExpected = Set(
+ new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
+ val invalidFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
+ sparkConf, invalidFederationConf)
+ invalidFederationResult should be (invalidFederationExpected)
+
+ // no namespaces defined, ie. old case
+ val noFederationConf = new Configuration()
+ noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
+ val noFederationExpected = Set(
+ new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
+ val noFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf,
+ noFederationConf)
+ noFederationResult should be (noFederationExpected)
+
+ // federation and HA enabled
+ val federationAndHAConf = new Configuration()
+ federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
+ federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
+ federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
+ federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
+ federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
+ federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
+ federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
+ federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
+ federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
+ "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
+ federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
+ "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
+
+ val federationAndHAExpected = Set(
+ new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
+ new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
+ val federationAndHAResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
+ sparkConf, federationAndHAConf)
+ federationAndHAResult should be (federationAndHAExpected)
+ }
+}
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index a7a448fbeb65e..026c289cdb9b7 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -490,10 +490,6 @@ for:
filesystem if `spark.yarn.stagingDir` is not set);
- if Hadoop federation is enabled, all the federated filesystems in the configuration.
-If an application needs to interact with other secure Hadoop filesystems, their URIs need to be
-explicitly provided to Spark at launch time. This is done by listing them in the
-`spark.yarn.access.hadoopFileSystems` property, described in the configuration section below.
-
The YARN integration also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark
@@ -527,18 +523,6 @@ providers can be disabled individually by setting `spark.security.credentials.{s
(Works also with the "local" master.)
-
spark.yarn.access.hadoopFileSystemsspark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
- webhdfs://nn3.com:50070. The Spark application must have access to the filesystems listed
- and Kerberos must be properly configured to be able to access them (either in the same realm
- or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
- the Spark application can access those remote Hadoop filesystems.
- spark.yarn.kerberos.relogin.periodspark.kerberos.access.hadoopFileSystemsspark.kerberos.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
+ webhdfs://nn3.com:50070. The Spark application must have access to the filesystems listed
+ and Kerberos must be properly configured to be able to access them (either in the same realm
+ or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
+ the Spark application can access those remote Hadoop filesystems.
+