Skip to content

Commit 07ff492

Browse files
committed
Review fixes:
* Config parameter deprecation * Return defaultFS all the time * get("spark.master", null)
1 parent 6eb9ab1 commit 07ff492

File tree

5 files changed

+38
-16
lines changed

5 files changed

+38
-16
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ private[spark] object SparkConf extends Logging {
638638
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
639639
"Not used anymore. Please use spark.shuffle.service.index.cache.size"),
640640
DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."),
641-
DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.")
641+
DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."),
642+
DeprecatedConfig("spark.yarn.access.namenodes", "3.0.0", "Not used anymore.")
642643
)
643644

644645
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
@@ -700,8 +701,6 @@ private[spark] object SparkConf extends Logging {
700701
AlternateConfig("spark.akka.frameSize", "1.6")),
701702
"spark.yarn.jars" -> Seq(
702703
AlternateConfig("spark.yarn.jar", "2.0")),
703-
"spark.yarn.access.hadoopFileSystems" -> Seq(
704-
AlternateConfig("spark.yarn.access.namenodes", "2.2")),
705704
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
706705
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
707706
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
@@ -715,7 +714,9 @@ private[spark] object SparkConf extends Logging {
715714
PRINCIPAL.key -> Seq(
716715
AlternateConfig("spark.yarn.principal", "3.0")),
717716
KERBEROS_RELOGIN_PERIOD.key -> Seq(
718-
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
717+
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
718+
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
719+
AlternateConfig(YARN_FILESYSTEMS_TO_ACCESS.key, "3.0"))
719720
)
720721

721722
/**

core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -138,22 +138,22 @@ private[deploy] object HadoopFSDelegationTokenProvider {
138138
def hadoopFSsToAccess(
139139
sparkConf: SparkConf,
140140
hadoopConf: Configuration): Set[FileSystem] = {
141-
val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS)
142-
val requestAllDelegationTokens = filesystemsToAccess.isEmpty
141+
val filesystemsToAccess = sparkConf.get(KERBEROS_FILESYSTEMS_TO_ACCESS)
143142

144-
val master = sparkConf.get("spark.master")
143+
val defaultFS = FileSystem.get(hadoopConf)
144+
val master = sparkConf.get("spark.master", null)
145145
val stagingFS = if (master != null && master.contains("yarn")) {
146146
sparkConf.get(STAGING_DIR)
147147
.map(new Path(_).getFileSystem(hadoopConf))
148-
.getOrElse(FileSystem.get(hadoopConf))
148+
.getOrElse(defaultFS)
149149
} else {
150-
FileSystem.get(hadoopConf)
150+
defaultFS
151151
}
152152

153153
// Add the list of available namenodes for all namespaces in HDFS federation.
154154
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
155155
// namespaces.
156-
val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") {
156+
val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || stagingFS.getScheme == "viewfs") {
157157
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
158158
} else {
159159
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
@@ -172,6 +172,6 @@ private[deploy] object HadoopFSDelegationTokenProvider {
172172
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
173173
}
174174

175-
hadoopFilesystems + stagingFS
175+
hadoopFilesystems + stagingFS + defaultFS
176176
}
177177
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -364,18 +364,19 @@ package object config {
364364
.checkValues(Set("keytab", "ccache"))
365365
.createWithDefault("keytab")
366366

367-
private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.kerberos.access.namenodes")
368-
.doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " +
369-
"fs.defaultFS does not need to be listed here.")
367+
private[spark] val YARN_FILESYSTEMS_TO_ACCESS =
368+
ConfigBuilder("spark.yarn.access.hadoopFileSystems")
369+
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
370+
"that hosts fs.defaultFS does not need to be listed here.")
370371
.stringConf
371372
.toSequence
372373
.createWithDefault(Nil)
373374

374-
private[spark] val FILESYSTEMS_TO_ACCESS =
375+
private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS =
375376
ConfigBuilder("spark.kerberos.access.hadoopFileSystems")
376377
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
377378
"that hosts fs.defaultFS does not need to be listed here.")
378-
.fallbackConf(NAMENODES_TO_ACCESS)
379+
.fallbackConf(YARN_FILESYSTEMS_TO_ACCESS)
379380

380381
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
381382
.intConf

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
264264

265265
conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
266266
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)
267+
268+
conf.set("spark.yarn.access.hadoopFileSystems", "testNode")
269+
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
267270
}
268271

269272
test("akka deprecated configs") {

core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,25 @@ import org.apache.hadoop.fs.Path
2222
import org.scalatest.Matchers
2323

2424
import org.apache.spark.{SparkConf, SparkFunSuite}
25+
import org.apache.spark.internal.config.STAGING_DIR
2526

2627
class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers {
28+
test("hadoopFSsToAccess should return defaultFS even if not configured") {
29+
val sparkConf = new SparkConf()
30+
val defaultFS = "hdfs://localhost:8020"
31+
val statingDir = "hdfs://localhost:8021"
32+
sparkConf.set("spark.master", "yarn-client")
33+
sparkConf.set(STAGING_DIR, statingDir)
34+
val hadoopConf = new Configuration()
35+
hadoopConf.set("fs.defaultFS", defaultFS)
36+
val expected = Set(
37+
new Path(defaultFS).getFileSystem(hadoopConf),
38+
new Path(statingDir).getFileSystem(hadoopConf)
39+
)
40+
val result = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
41+
result should be (expected)
42+
}
43+
2744
test("SPARK-24149: retrieve all namenodes from HDFS") {
2845
val sparkConf = new SparkConf()
2946
sparkConf.set("spark.master", "yarn-client")

0 commit comments

Comments
 (0)