Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,6 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0")),
"spark.yarn.access.hadoopFileSystems" -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2")),
MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq(
AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")),
LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq(
Expand All @@ -715,7 +713,10 @@ private[spark] object SparkConf extends Logging {
PRINCIPAL.key -> Seq(
AlternateConfig("spark.yarn.principal", "3.0")),
KERBEROS_RELOGIN_PERIOD.key -> Seq(
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0"))
AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")),
KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq(
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0"))
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ private[security] class HBaseDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private[spark] class HadoopDelegationTokenManager(
def obtainDelegationTokens(creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds)
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
Expand Down Expand Up @@ -181,14 +181,6 @@ private[spark] class HadoopDelegationTokenManager(
.getOrElse(isEnabledDeprecated)
}

/**
* List of file systems for which to obtain delegation tokens. The base implementation
* returns just the default file system in the given Hadoop configuration.
*/
protected def fileSystemsToAccess(): Set[FileSystem] = {
Set(FileSystem.get(hadoopConf))
}

private def scheduleRenewal(delay: Long): Unit = {
val _delay = math.max(0, delay)
logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ private[spark] trait HadoopDelegationTokenProvider {
* Obtain delegation tokens for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param creds Credentials to add tokens and security keys to.
* @param fileSystems List of file systems for which to obtain delegation tokens.
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
Expand All @@ -44,9 +44,9 @@ private[deploy] class HadoopFSDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)

// Get the token renewal interval if it is not set. It will only be called once.
Expand Down Expand Up @@ -133,3 +133,44 @@ private[deploy] class HadoopFSDelegationTokenProvider
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
}

private[deploy] object HadoopFSDelegationTokenProvider {
def hadoopFSsToAccess(
sparkConf: SparkConf,
hadoopConf: Configuration): Set[FileSystem] = {
val filesystemsToAccess = sparkConf.get(KERBEROS_FILESYSTEMS_TO_ACCESS)

val defaultFS = FileSystem.get(hadoopConf)
val master = sparkConf.get("spark.master", null)
val stagingFS = if (master != null && master.contains("yarn")) {
sparkConf.get(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf))
} else {
None
}

// Add the list of available namenodes for all namespaces in HDFS federation.
// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its
// namespaces.
val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || defaultFS.getScheme == "viewfs" ||
(stagingFS.isDefined && stagingFS.get.getScheme == "viewfs")) {
filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet
} else {
val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
// Retrieving the filesystem for the nameservices where HA is not enabled
val filesystemsWithoutHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode =>
new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)
}
}
// Retrieving the filesystem for the nameservices where HA is enabled
val filesystemsWithHA = nameservices.flatMap { ns =>
Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ =>
new Path(s"hdfs://$ns").getFileSystem(hadoopConf)
}
}
(filesystemsWithoutHA ++ filesystemsWithHA).toSet
}

hadoopFilesystems ++ stagingFS + defaultFS
}
}
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,14 @@ package object config {
.checkValues(Set("keytab", "ccache"))
.createWithDefault("keytab")

private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS =
ConfigBuilder("spark.kerberos.access.hadoopFileSystems")
.doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " +
"that hosts fs.defaultFS does not need to be listed here.")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
Expand Down Expand Up @@ -1253,4 +1261,9 @@ package object config {
ConfigBuilder("spark.speculation.quantile")
.doubleConf
.createWithDefault(0.75)

private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
.doc("Staging directory used while submitting applications.")
.stringConf
.createOptional
}
6 changes: 6 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

conf.set("spark.scheduler.listenerbus.eventqueue.size", "84")
assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84)

conf.set("spark.yarn.access.namenodes", "testNode")
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))

conf.set("spark.yarn.access.hadoopFileSystems", "testNode")
assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode"))
}

test("akka deprecated configs") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.{SparkConf, SparkFunSuite}
Expand All @@ -36,7 +35,6 @@ private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationT
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = throw new IllegalArgumentException
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.STAGING_DIR

class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers {
test("hadoopFSsToAccess should return defaultFS even if not configured") {
val sparkConf = new SparkConf()
val defaultFS = "hdfs://localhost:8020"
val statingDir = "hdfs://localhost:8021"
sparkConf.set("spark.master", "yarn-client")
sparkConf.set(STAGING_DIR, statingDir)
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", defaultFS)
val expected = Set(
new Path(defaultFS).getFileSystem(hadoopConf),
new Path(statingDir).getFileSystem(hadoopConf)
)
val result = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
result should be (expected)
}

test("SPARK-24149: retrieve all namenodes from HDFS") {
val sparkConf = new SparkConf()
sparkConf.set("spark.master", "yarn-client")
val basicFederationConf = new Configuration()
basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
basicFederationConf.set("dfs.nameservices", "ns1,ns2")
basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020")
basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021")
val basicFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf),
new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf))
val basicFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, basicFederationConf)
basicFederationResult should be (basicFederationExpected)

// when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them
val viewFsConf = new Configuration()
viewFsConf.addResource(basicFederationConf)
viewFsConf.set("fs.defaultFS", "viewfs://clusterX/")
viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/")
val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf))
HadoopFSDelegationTokenProvider
.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected)

// invalid config should not throw NullPointerException
val invalidFederationConf = new Configuration()
invalidFederationConf.addResource(basicFederationConf)
invalidFederationConf.unset("dfs.namenode.rpc-address.ns2")
val invalidFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf))
val invalidFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, invalidFederationConf)
invalidFederationResult should be (invalidFederationExpected)

// no namespaces defined, ie. old case
val noFederationConf = new Configuration()
noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020")
val noFederationExpected = Set(
new Path("hdfs://localhost:8020").getFileSystem(noFederationConf))
val noFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf,
noFederationConf)
noFederationResult should be (noFederationExpected)

// federation and HA enabled
val federationAndHAConf = new Configuration()
federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA")
federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA")
federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2")
federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022")
federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023")
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")
federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")

val federationAndHAExpected = Set(
new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf),
new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf))
val federationAndHAResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(
sparkConf, federationAndHAConf)
federationAndHAResult should be (federationAndHAExpected)
}
}
18 changes: 1 addition & 17 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,10 +490,6 @@ for:
filesystem if `spark.yarn.stagingDir` is not set);
- if Hadoop federation is enabled, all the federated filesystems in the configuration.

If an application needs to interact with other secure Hadoop filesystems, their URIs need to be
explicitly provided to Spark at launch time. This is done by listing them in the
`spark.yarn.access.hadoopFileSystems` property, described in the configuration section below.

The YARN integration also supports custom delegation token providers using the Java Services
mechanism (see `java.util.ServiceLoader`). Implementations of
`org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark
Expand Down Expand Up @@ -527,18 +523,6 @@ providers can be disabled individually by setting `spark.security.credentials.{s
<br /> (Works also with the "local" master.)
</td>
</tr>
<tr>
<td><code>spark.yarn.access.hadoopFileSystems</code></td>
<td>(none)</td>
<td>
A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For
example, <code>spark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
webhdfs://nn3.com:50070</code>. The Spark application must have access to the filesystems listed
and Kerberos must be properly configured to be able to access them (either in the same realm
or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
the Spark application can access those remote Hadoop filesystems.
</td>
</tr>
<tr>
<td><code>spark.yarn.kerberos.relogin.period</code></td>
<td>1m</td>
Expand Down Expand Up @@ -644,7 +628,7 @@ spark.security.credentials.hive.enabled false
spark.security.credentials.hbase.enabled false
```

The configuration option `spark.yarn.access.hadoopFileSystems` must be unset.
The configuration option `spark.kerberos.access.hadoopFileSystems` must be unset.

# Using the Spark History Server to replace the Spark Web UI

Expand Down
16 changes: 16 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,10 @@ configuration has Kerberos authentication turned (`hbase.security.authentication
Similarly, a Hive token will be obtained if Hive is in the classpath, and the configuration includes
URIs for remote metastore services (`hive.metastore.uris` is not empty).

If an application needs to interact with other secure Hadoop filesystems, their URIs need to be
explicitly provided to Spark at launch time. This is done by listing them in the
`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below.

Delegation token support is currently only supported in YARN and Mesos modes. Consult the
deployment-specific page for more information.

Expand All @@ -769,6 +773,18 @@ The following options provides finer-grained control for this feature:
application being run.
</td>
</tr>
<tr>
<td><code>spark.kerberos.access.hadoopFileSystems</code></td>
<td>(none)</td>
<td>
A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For
example, <code>spark.kerberos.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032,
webhdfs://nn3.com:50070</code>. The Spark application must have access to the filesystems listed
and Kerberos must be properly configured to be able to access them (either in the same realm
or in a trusted realm). Spark acquires security tokens for each of the filesystems so that
the Spark application can access those remote Hadoop filesystems.
</td>
</tr>
</table>

## Long-Running Applications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.language.existentials
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}

Expand All @@ -38,7 +37,6 @@ private[spark] class KafkaDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
logDebug("Attempting to fetch Kafka security token.")
Expand Down
Loading