From 69646eb8481ad1447495539c6ff5585fa83e74f6 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 30 Jan 2019 10:57:45 +0100 Subject: [PATCH 1/6] [SPARK-26766][CORE] Remove the list of filesystems from HadoopDelegationTokenProvider.obtainDelegationTokens --- .../HBaseDelegationTokenProvider.scala | 1 - .../HadoopDelegationTokenManager.scala | 10 +- .../HadoopDelegationTokenProvider.scala | 2 - .../HadoopFSDelegationTokenProvider.scala | 46 +++++++++- .../spark/internal/config/package.scala | 18 ++++ .../HadoopDelegationTokenManagerSuite.scala | 1 - ...HadoopFSDelegationTokenProviderSuite.scala | 92 +++++++++++++++++++ .../KafkaDelegationTokenProvider.scala | 1 - .../deploy/yarn/YarnSparkHadoopUtil.scala | 41 +-------- .../org/apache/spark/deploy/yarn/config.scala | 19 ---- .../YARNHadoopDelegationTokenManager.scala | 6 -- .../yarn/YarnSparkHadoopUtilSuite.scala | 66 ------------- .../HiveDelegationTokenProvider.scala | 1 - 13 files changed, 156 insertions(+), 148 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala index e345b0bbffeae..d53eb4efe529b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HBaseDelegationTokenProvider.scala @@ -39,7 +39,6 @@ private[security] class HBaseDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, sparkConf: SparkConf, - fileSystems: Set[FileSystem], creds: Credentials): Option[Long] = { try { val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala index 487291e176059..6a18a8dd33d1a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala @@ -144,7 +144,7 @@ private[spark] class HadoopDelegationTokenManager( def obtainDelegationTokens(creds: Credentials): Long = { delegationTokenProviders.values.flatMap { provider => if (provider.delegationTokensRequired(sparkConf, hadoopConf)) { - provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds) + provider.obtainDelegationTokens(hadoopConf, sparkConf, creds) } else { logDebug(s"Service ${provider.serviceName} does not require a token." + s" Check your configuration to see if security is disabled or not.") @@ -181,14 +181,6 @@ private[spark] class HadoopDelegationTokenManager( .getOrElse(isEnabledDeprecated) } - /** - * List of file systems for which to obtain delegation tokens. The base implementation - * returns just the default file system in the given Hadoop configuration. - */ - protected def fileSystemsToAccess(): Set[FileSystem] = { - Set(FileSystem.get(hadoopConf)) - } - private def scheduleRenewal(delay: Long): Unit = { val _delay = math.max(0, delay) logInfo(s"Scheduling renewal in ${UIUtils.formatDuration(delay)}.") diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala index cb4c97bdc4c1a..3dc952d54e730 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala @@ -44,13 +44,11 @@ private[spark] trait HadoopDelegationTokenProvider { * Obtain delegation tokens for this service and get the time of the next renewal. * @param hadoopConf Configuration of current Hadoop Compatible system. * @param creds Credentials to add tokens and security keys to. - * @param fileSystems List of file systems for which to obtain delegation tokens. * @return If the returned tokens are renewable and can be renewed, return the time of the next * renewal, otherwise None should be returned. */ def obtainDelegationTokens( hadoopConf: Configuration, sparkConf: SparkConf, - fileSystems: Set[FileSystem], creds: Credentials): Option[Long] } diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 3bc40de776ec9..4ccf27cb3de7b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -22,7 +22,7 @@ import scala.util.Try import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.Master import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier @@ -44,9 +44,9 @@ private[deploy] class HadoopFSDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, sparkConf: SparkConf, - fileSystems: Set[FileSystem], creds: Credentials): Option[Long] = { try { + val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf) val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds) // Get the token renewal interval if it is not set. It will only be called once. @@ -133,3 +133,45 @@ private[deploy] class HadoopFSDelegationTokenProvider if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } } + +private[deploy] object HadoopFSDelegationTokenProvider { + def hadoopFSsToAccess( + sparkConf: SparkConf, + hadoopConf: Configuration): Set[FileSystem] = { + val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) + val requestAllDelegationTokens = filesystemsToAccess.isEmpty + + val master = sparkConf.get("spark.master") + val stagingFS = if (master != null && master.contains("yarn")) { + sparkConf.get(STAGING_DIR) + .map(new Path(_).getFileSystem(hadoopConf)) + .getOrElse(FileSystem.get(hadoopConf)) + } else { + FileSystem.get(hadoopConf) + } + + // Add the list of available namenodes for all namespaces in HDFS federation. + // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its + // namespaces. + val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") { + filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet + } else { + val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") + // Retrieving the filesystem for the nameservices where HA is not enabled + val filesystemsWithoutHA = nameservices.flatMap { ns => + Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode => + new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf) + } + } + // Retrieving the filesystem for the nameservices where HA is enabled + val filesystemsWithHA = nameservices.flatMap { ns => + Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ => + new Path(s"hdfs://$ns").getFileSystem(hadoopConf) + } + } + (filesystemsWithoutHA ++ filesystemsWithHA).toSet + } + + hadoopFilesystems + stagingFS + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 66b1ff76446aa..9ddb6058ff821 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -364,6 +364,19 @@ package object config { .checkValues(Set("keytab", "ccache")) .createWithDefault("keytab") + private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.kerberos.access.namenodes") + .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + + "fs.defaultFS does not need to be listed here.") + .stringConf + .toSequence + .createWithDefault(Nil) + + private[spark] val FILESYSTEMS_TO_ACCESS = + ConfigBuilder("spark.kerberos.access.hadoopFileSystems") + .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + + "that hosts fs.defaultFS does not need to be listed here.") + .fallbackConf(NAMENODES_TO_ACCESS) + private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional @@ -1253,4 +1266,9 @@ package object config { ConfigBuilder("spark.speculation.quantile") .doubleConf .createWithDefault(0.75) + + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .createOptional } diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 9cc4a914b90e4..95cb48a54f164 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -36,7 +36,6 @@ private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationT override def obtainDelegationTokens( hadoopConf: Configuration, sparkConf: SparkConf, - fileSystems: Set[FileSystem], creds: Credentials): Option[Long] = throw new IllegalArgumentException } diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala new file mode 100644 index 0000000000000..184e13297d9e8 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.security + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.Matchers + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers { + test("SPARK-24149: retrieve all namenodes from HDFS") { + val sparkConf = new SparkConf() + sparkConf.set("spark.master", "yarn-client") + val basicFederationConf = new Configuration() + basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020") + basicFederationConf.set("dfs.nameservices", "ns1,ns2") + basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020") + basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021") + val basicFederationExpected = Set( + new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf), + new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf)) + val basicFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess( + sparkConf, basicFederationConf) + basicFederationResult should be (basicFederationExpected) + + // when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them + val viewFsConf = new Configuration() + viewFsConf.addResource(basicFederationConf) + viewFsConf.set("fs.defaultFS", "viewfs://clusterX/") + viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/") + val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf)) + HadoopFSDelegationTokenProvider + .hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected) + + // invalid config should not throw NullPointerException + val invalidFederationConf = new Configuration() + invalidFederationConf.addResource(basicFederationConf) + invalidFederationConf.unset("dfs.namenode.rpc-address.ns2") + val invalidFederationExpected = Set( + new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf)) + val invalidFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess( + sparkConf, invalidFederationConf) + invalidFederationResult should be (invalidFederationExpected) + + // no namespaces defined, ie. old case + val noFederationConf = new Configuration() + noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020") + val noFederationExpected = Set( + new Path("hdfs://localhost:8020").getFileSystem(noFederationConf)) + val noFederationResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, + noFederationConf) + noFederationResult should be (noFederationExpected) + + // federation and HA enabled + val federationAndHAConf = new Configuration() + federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA") + federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA") + federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2") + federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2") + federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020") + federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021") + federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022") + federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023") + federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider") + federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA", + "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider") + + val federationAndHAExpected = Set( + new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf), + new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf)) + val federationAndHAResult = HadoopFSDelegationTokenProvider.hadoopFSsToAccess( + sparkConf, federationAndHAConf) + federationAndHAResult should be (federationAndHAExpected) + } +} diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala index ac6baa09e0224..36ef1666f3b8d 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala @@ -38,7 +38,6 @@ private[spark] class KafkaDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, sparkConf: SparkConf, - fileSystems: Set[FileSystem], creds: Credentials): Option[Long] = { try { logDebug("Attempting to fetch Kafka security token.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 3a3272216294f..b904687dafafb 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -21,14 +21,11 @@ import java.util.regex.{Matcher, Pattern} import scala.collection.mutable.{HashMap, ListBuffer} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.deploy.yarn.config._ +import org.apache.spark.SecurityManager import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils @@ -185,40 +182,4 @@ object YarnSparkHadoopUtil { ConverterUtils.toContainerId(containerIdString) } - /** The filesystems for which YARN should fetch delegation tokens. */ - def hadoopFSsToAccess( - sparkConf: SparkConf, - hadoopConf: Configuration): Set[FileSystem] = { - val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) - val requestAllDelegationTokens = filesystemsToAccess.isEmpty - - val stagingFS = sparkConf.get(STAGING_DIR) - .map(new Path(_).getFileSystem(hadoopConf)) - .getOrElse(FileSystem.get(hadoopConf)) - - // Add the list of available namenodes for all namespaces in HDFS federation. - // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its - // namespaces. - val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") { - filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet - } else { - val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") - // Retrieving the filesystem for the nameservices where HA is not enabled - val filesystemsWithoutHA = nameservices.flatMap { ns => - Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map { nameNode => - new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf) - } - } - // Retrieving the filesystem for the nameservices where HA is enabled - val filesystemsWithHA = nameservices.flatMap { ns => - Option(hadoopConf.get(s"dfs.ha.namenodes.$ns")).map { _ => - new Path(s"hdfs://$ns").getFileSystem(hadoopConf) - } - } - (filesystemsWithoutHA ++ filesystemsWithHA).toSet - } - - hadoopFilesystems + stagingFS - } - } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 16adaec04802e..bd5e136151650 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -122,11 +122,6 @@ package object config { .intConf .createOptional - private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") - .doc("Staging directory used while submitting applications.") - .stringConf - .createOptional - /* Launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") @@ -244,20 +239,6 @@ package object config { .booleanConf .createWithDefault(false) - /* Security configuration. */ - - private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.yarn.access.namenodes") - .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + - "fs.defaultFS does not need to be listed here.") - .stringConf - .toSequence - .createWithDefault(Nil) - - private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") - .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + - "that hosts fs.defaultFS does not need to be listed here.") - .fallbackConf(NAMENODES_TO_ACCESS) - /* Rolled log aggregation configuration. */ private[spark] val ROLLED_LOG_INCLUDE_PATTERN = diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala index bb40ea8015198..fc1f75254c578 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManager.scala @@ -22,12 +22,10 @@ import java.util.ServiceLoader import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials import org.apache.spark.SparkConf import org.apache.spark.deploy.security.HadoopDelegationTokenManager -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -74,8 +72,4 @@ private[spark] class YARNHadoopDelegationTokenManager( credentialProviders.contains(serviceName) || super.isProviderLoaded(serviceName) } - override protected def fileSystemsToAccess(): Set[FileSystem] = { - YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, hadoopConf) - } - } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index de7ff8238c9e7..e7cde03a01b46 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -21,8 +21,6 @@ import java.io.{File, IOException} import java.nio.charset.StandardCharsets import com.google.common.io.{ByteStreams, Files} -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.Matchers @@ -30,7 +28,6 @@ import org.scalatest.Matchers import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ import org.apache.spark.internal.config.UI._ import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -143,67 +140,4 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } } - - test("SPARK-24149: retrieve all namenodes from HDFS") { - val sparkConf = new SparkConf() - val basicFederationConf = new Configuration() - basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020") - basicFederationConf.set("dfs.nameservices", "ns1,ns2") - basicFederationConf.set("dfs.namenode.rpc-address.ns1", "localhost:8020") - basicFederationConf.set("dfs.namenode.rpc-address.ns2", "localhost:8021") - val basicFederationExpected = Set( - new Path("hdfs://localhost:8020").getFileSystem(basicFederationConf), - new Path("hdfs://localhost:8021").getFileSystem(basicFederationConf)) - val basicFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess( - sparkConf, basicFederationConf) - basicFederationResult should be (basicFederationExpected) - - // when viewfs is enabled, namespaces are handled by it, so we don't need to take care of them - val viewFsConf = new Configuration() - viewFsConf.addResource(basicFederationConf) - viewFsConf.set("fs.defaultFS", "viewfs://clusterX/") - viewFsConf.set("fs.viewfs.mounttable.clusterX.link./home", "hdfs://localhost:8020/") - val viewFsExpected = Set(new Path("viewfs://clusterX/").getFileSystem(viewFsConf)) - YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, viewFsConf) should be (viewFsExpected) - - // invalid config should not throw NullPointerException - val invalidFederationConf = new Configuration() - invalidFederationConf.addResource(basicFederationConf) - invalidFederationConf.unset("dfs.namenode.rpc-address.ns2") - val invalidFederationExpected = Set( - new Path("hdfs://localhost:8020").getFileSystem(invalidFederationConf)) - val invalidFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess( - sparkConf, invalidFederationConf) - invalidFederationResult should be (invalidFederationExpected) - - // no namespaces defined, ie. old case - val noFederationConf = new Configuration() - noFederationConf.set("fs.defaultFS", "hdfs://localhost:8020") - val noFederationExpected = Set( - new Path("hdfs://localhost:8020").getFileSystem(noFederationConf)) - val noFederationResult = YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, noFederationConf) - noFederationResult should be (noFederationExpected) - - // federation and HA enabled - val federationAndHAConf = new Configuration() - federationAndHAConf.set("fs.defaultFS", "hdfs://clusterXHA") - federationAndHAConf.set("dfs.nameservices", "clusterXHA,clusterYHA") - federationAndHAConf.set("dfs.ha.namenodes.clusterXHA", "x-nn1,x-nn2") - federationAndHAConf.set("dfs.ha.namenodes.clusterYHA", "y-nn1,y-nn2") - federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn1", "localhost:8020") - federationAndHAConf.set("dfs.namenode.rpc-address.clusterXHA.x-nn2", "localhost:8021") - federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn1", "localhost:8022") - federationAndHAConf.set("dfs.namenode.rpc-address.clusterYHA.y-nn2", "localhost:8023") - federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterXHA", - "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider") - federationAndHAConf.set("dfs.client.failover.proxy.provider.clusterYHA", - "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider") - - val federationAndHAExpected = Set( - new Path("hdfs://clusterXHA").getFileSystem(federationAndHAConf), - new Path("hdfs://clusterYHA").getFileSystem(federationAndHAConf)) - val federationAndHAResult = YarnSparkHadoopUtil.hadoopFSsToAccess( - sparkConf, federationAndHAConf) - federationAndHAResult should be (federationAndHAExpected) - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala index 25eb2515a0c56..c0c46187b13af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/security/HiveDelegationTokenProvider.scala @@ -85,7 +85,6 @@ private[spark] class HiveDelegationTokenProvider override def obtainDelegationTokens( hadoopConf: Configuration, sparkConf: SparkConf, - fileSystems: Set[FileSystem], creds: Credentials): Option[Long] = { try { val conf = hiveConf(hadoopConf) From 6eb9ab1531a3d63b10233633a19539f75fea47b5 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 30 Jan 2019 15:40:50 +0100 Subject: [PATCH 2/6] Compile fix + removed couple of unused imports --- .../deploy/security/HadoopDelegationTokenManagerSuite.scala | 1 - .../apache/spark/kafka010/KafkaDelegationTokenProvider.scala | 1 - .../apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 +- 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala index 95cb48a54f164..2f36dba05c64e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.security import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials import org.apache.spark.{SparkConf, SparkFunSuite} diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala index 36ef1666f3b8d..c69e8a320059d 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaDelegationTokenProvider.scala @@ -21,7 +21,6 @@ import scala.language.existentials import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.security.Credentials import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 463cf1680efde..16cb78a3fdfa9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -126,7 +126,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { val tokenProvider = new HiveDelegationTokenProvider() if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) { val credentials = new Credentials() - tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, Set.empty, credentials) + tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials) UserGroupInformation.getCurrentUser.addCredentials(credentials) } From 07ff492d3e142605705aa85d3f27c8634b91de07 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 31 Jan 2019 16:47:58 +0100 Subject: [PATCH 3/6] Review fixes: * Config parameter deprecation * Return defaultFS all the time * get("spark.master", null) --- .../main/scala/org/apache/spark/SparkConf.scala | 9 +++++---- .../HadoopFSDelegationTokenProvider.scala | 14 +++++++------- .../apache/spark/internal/config/package.scala | 11 ++++++----- .../scala/org/apache/spark/SparkConfSuite.scala | 3 +++ .../HadoopFSDelegationTokenProviderSuite.scala | 17 +++++++++++++++++ 5 files changed, 38 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 7d49f10d44823..c453bf0144cd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -638,7 +638,8 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", "Not used anymore. Please use spark.shuffle.service.index.cache.size"), DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."), - DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.") + DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."), + DeprecatedConfig("spark.yarn.access.namenodes", "3.0.0", "Not used anymore.") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) @@ -700,8 +701,6 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.akka.frameSize", "1.6")), "spark.yarn.jars" -> Seq( AlternateConfig("spark.yarn.jar", "2.0")), - "spark.yarn.access.hadoopFileSystems" -> Seq( - AlternateConfig("spark.yarn.access.namenodes", "2.2")), MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq( AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")), LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq( @@ -715,7 +714,9 @@ private[spark] object SparkConf extends Logging { PRINCIPAL.key -> Seq( AlternateConfig("spark.yarn.principal", "3.0")), KERBEROS_RELOGIN_PERIOD.key -> Seq( - AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")) + AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")), + KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq( + AlternateConfig(YARN_FILESYSTEMS_TO_ACCESS.key, "3.0")) ) /** diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 4ccf27cb3de7b..d4ae1de61f4d4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -138,22 +138,22 @@ private[deploy] object HadoopFSDelegationTokenProvider { def hadoopFSsToAccess( sparkConf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = { - val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) - val requestAllDelegationTokens = filesystemsToAccess.isEmpty + val filesystemsToAccess = sparkConf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) - val master = sparkConf.get("spark.master") + val defaultFS = FileSystem.get(hadoopConf) + val master = sparkConf.get("spark.master", null) val stagingFS = if (master != null && master.contains("yarn")) { sparkConf.get(STAGING_DIR) .map(new Path(_).getFileSystem(hadoopConf)) - .getOrElse(FileSystem.get(hadoopConf)) + .getOrElse(defaultFS) } else { - FileSystem.get(hadoopConf) + defaultFS } // Add the list of available namenodes for all namespaces in HDFS federation. // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its // namespaces. - val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") { + val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || stagingFS.getScheme == "viewfs") { filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet } else { val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") @@ -172,6 +172,6 @@ private[deploy] object HadoopFSDelegationTokenProvider { (filesystemsWithoutHA ++ filesystemsWithHA).toSet } - hadoopFilesystems + stagingFS + hadoopFilesystems + stagingFS + defaultFS } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 9ddb6058ff821..86ae361a21137 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -364,18 +364,19 @@ package object config { .checkValues(Set("keytab", "ccache")) .createWithDefault("keytab") - private[spark] val NAMENODES_TO_ACCESS = ConfigBuilder("spark.kerberos.access.namenodes") - .doc("Extra NameNode URLs for which to request delegation tokens. The NameNode that hosts " + - "fs.defaultFS does not need to be listed here.") + private[spark] val YARN_FILESYSTEMS_TO_ACCESS = + ConfigBuilder("spark.yarn.access.hadoopFileSystems") + .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + + "that hosts fs.defaultFS does not need to be listed here.") .stringConf .toSequence .createWithDefault(Nil) - private[spark] val FILESYSTEMS_TO_ACCESS = + private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.kerberos.access.hadoopFileSystems") .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + "that hosts fs.defaultFS does not need to be listed here.") - .fallbackConf(NAMENODES_TO_ACCESS) + .fallbackConf(YARN_FILESYSTEMS_TO_ACCESS) private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 079170d47d0b2..89c16927aa5ce 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -264,6 +264,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set("spark.scheduler.listenerbus.eventqueue.size", "84") assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84) + + conf.set("spark.yarn.access.hadoopFileSystems", "testNode") + assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode")) } test("akka deprecated configs") { diff --git a/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala index 184e13297d9e8..0f1a9168e2542 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProviderSuite.scala @@ -22,8 +22,25 @@ import org.apache.hadoop.fs.Path import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.STAGING_DIR class HadoopFSDelegationTokenProviderSuite extends SparkFunSuite with Matchers { + test("hadoopFSsToAccess should return defaultFS even if not configured") { + val sparkConf = new SparkConf() + val defaultFS = "hdfs://localhost:8020" + val statingDir = "hdfs://localhost:8021" + sparkConf.set("spark.master", "yarn-client") + sparkConf.set(STAGING_DIR, statingDir) + val hadoopConf = new Configuration() + hadoopConf.set("fs.defaultFS", defaultFS) + val expected = Set( + new Path(defaultFS).getFileSystem(hadoopConf), + new Path(statingDir).getFileSystem(hadoopConf) + ) + val result = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf) + result should be (expected) + } + test("SPARK-24149: retrieve all namenodes from HDFS") { val sparkConf = new SparkConf() sparkConf.set("spark.master", "yarn-client") From e73250d32b5d0e5198d7f2a323d98cfbff0576b9 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 31 Jan 2019 16:56:02 +0100 Subject: [PATCH 4/6] Nit fix --- .../spark/deploy/security/HadoopFSDelegationTokenProvider.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index d4ae1de61f4d4..4eaeaf5b44472 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -147,7 +147,7 @@ private[deploy] object HadoopFSDelegationTokenProvider { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(defaultFS) } else { - defaultFS + defaultFS } // Add the list of available namenodes for all namespaces in HDFS federation. From 32c5d5dbe277b7771b6223d98ab5d72671b0d753 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 1 Feb 2019 17:46:32 +0100 Subject: [PATCH 5/6] Review fixes: * Doc update * Param deprecation --- core/src/main/scala/org/apache/spark/SparkConf.scala | 6 +++--- .../org/apache/spark/internal/config/package.scala | 10 ++-------- .../test/scala/org/apache/spark/SparkConfSuite.scala | 3 +++ docs/running-on-yarn.md | 8 ++++---- 4 files changed, 12 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index c453bf0144cd2..e686e079d1a19 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -638,8 +638,7 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0", "Not used anymore. Please use spark.shuffle.service.index.cache.size"), DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."), - DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."), - DeprecatedConfig("spark.yarn.access.namenodes", "3.0.0", "Not used anymore.") + DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) @@ -716,7 +715,8 @@ private[spark] object SparkConf extends Logging { KERBEROS_RELOGIN_PERIOD.key -> Seq( AlternateConfig("spark.yarn.kerberos.relogin.period", "3.0")), KERBEROS_FILESYSTEMS_TO_ACCESS.key -> Seq( - AlternateConfig(YARN_FILESYSTEMS_TO_ACCESS.key, "3.0")) + AlternateConfig("spark.yarn.access.namenodes", "2.2"), + AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0")) ) /** diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 86ae361a21137..3ef6cba8193f6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -364,20 +364,14 @@ package object config { .checkValues(Set("keytab", "ccache")) .createWithDefault("keytab") - private[spark] val YARN_FILESYSTEMS_TO_ACCESS = - ConfigBuilder("spark.yarn.access.hadoopFileSystems") + private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS = + ConfigBuilder("spark.kerberos.access.hadoopFileSystems") .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + "that hosts fs.defaultFS does not need to be listed here.") .stringConf .toSequence .createWithDefault(Nil) - private[spark] val KERBEROS_FILESYSTEMS_TO_ACCESS = - ConfigBuilder("spark.kerberos.access.hadoopFileSystems") - .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + - "that hosts fs.defaultFS does not need to be listed here.") - .fallbackConf(YARN_FILESYSTEMS_TO_ACCESS) - private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances") .intConf .createOptional diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 89c16927aa5ce..c187bcb577016 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -265,6 +265,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst conf.set("spark.scheduler.listenerbus.eventqueue.size", "84") assert(conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY) === 84) + conf.set("spark.yarn.access.namenodes", "testNode") + assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode")) + conf.set("spark.yarn.access.hadoopFileSystems", "testNode") assert(conf.get(KERBEROS_FILESYSTEMS_TO_ACCESS) === Array("testNode")) } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index a7a448fbeb65e..24f2a8b2ca685 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -492,7 +492,7 @@ for: If an application needs to interact with other secure Hadoop filesystems, their URIs need to be explicitly provided to Spark at launch time. This is done by listing them in the -`spark.yarn.access.hadoopFileSystems` property, described in the configuration section below. +`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below. The YARN integration also supports custom delegation token providers using the Java Services mechanism (see `java.util.ServiceLoader`). Implementations of @@ -528,11 +528,11 @@ providers can be disabled individually by setting `spark.security.credentials.{s - spark.yarn.access.hadoopFileSystems + spark.kerberos.access.hadoopFileSystems (none) A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For - example, spark.yarn.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032, + example, spark.kerberos.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032, webhdfs://nn3.com:50070. The Spark application must have access to the filesystems listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the filesystems so that @@ -644,7 +644,7 @@ spark.security.credentials.hive.enabled false spark.security.credentials.hbase.enabled false ``` -The configuration option `spark.yarn.access.hadoopFileSystems` must be unset. +The configuration option `spark.kerberos.access.hadoopFileSystems` must be unset. # Using the Spark History Server to replace the Spark Web UI From 1c87238cb28fd28b46f1b7362a20a7538a617ba5 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 7 Feb 2019 15:26:50 +0100 Subject: [PATCH 6/6] Review fixes: * Simplified hadoopFSsToAccess * Moved doc to generic area --- .../HadoopFSDelegationTokenProvider.scala | 11 +++++------ docs/running-on-yarn.md | 16 ---------------- docs/security.md | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 4eaeaf5b44472..725eefbda897b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -143,17 +143,16 @@ private[deploy] object HadoopFSDelegationTokenProvider { val defaultFS = FileSystem.get(hadoopConf) val master = sparkConf.get("spark.master", null) val stagingFS = if (master != null && master.contains("yarn")) { - sparkConf.get(STAGING_DIR) - .map(new Path(_).getFileSystem(hadoopConf)) - .getOrElse(defaultFS) + sparkConf.get(STAGING_DIR).map(new Path(_).getFileSystem(hadoopConf)) } else { - defaultFS + None } // Add the list of available namenodes for all namespaces in HDFS federation. // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its // namespaces. - val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || stagingFS.getScheme == "viewfs") { + val hadoopFilesystems = if (!filesystemsToAccess.isEmpty || defaultFS.getScheme == "viewfs" || + (stagingFS.isDefined && stagingFS.get.getScheme == "viewfs")) { filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet } else { val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") @@ -172,6 +171,6 @@ private[deploy] object HadoopFSDelegationTokenProvider { (filesystemsWithoutHA ++ filesystemsWithHA).toSet } - hadoopFilesystems + stagingFS + defaultFS + hadoopFilesystems ++ stagingFS + defaultFS } } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 24f2a8b2ca685..026c289cdb9b7 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -490,10 +490,6 @@ for: filesystem if `spark.yarn.stagingDir` is not set); - if Hadoop federation is enabled, all the federated filesystems in the configuration. -If an application needs to interact with other secure Hadoop filesystems, their URIs need to be -explicitly provided to Spark at launch time. This is done by listing them in the -`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below. - The YARN integration also supports custom delegation token providers using the Java Services mechanism (see `java.util.ServiceLoader`). Implementations of `org.apache.spark.deploy.yarn.security.ServiceCredentialProvider` can be made available to Spark @@ -527,18 +523,6 @@ providers can be disabled individually by setting `spark.security.credentials.{s
(Works also with the "local" master.) - - spark.kerberos.access.hadoopFileSystems - (none) - - A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For - example, spark.kerberos.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032, - webhdfs://nn3.com:50070. The Spark application must have access to the filesystems listed - and Kerberos must be properly configured to be able to access them (either in the same realm - or in a trusted realm). Spark acquires security tokens for each of the filesystems so that - the Spark application can access those remote Hadoop filesystems. - - spark.yarn.kerberos.relogin.period 1m diff --git a/docs/security.md b/docs/security.md index a1dc584705a4b..d2cff41eb0f7d 100644 --- a/docs/security.md +++ b/docs/security.md @@ -752,6 +752,10 @@ configuration has Kerberos authentication turned (`hbase.security.authentication Similarly, a Hive token will be obtained if Hive is in the classpath, and the configuration includes URIs for remote metastore services (`hive.metastore.uris` is not empty). +If an application needs to interact with other secure Hadoop filesystems, their URIs need to be +explicitly provided to Spark at launch time. This is done by listing them in the +`spark.kerberos.access.hadoopFileSystems` property, described in the configuration section below. + Delegation token support is currently only supported in YARN and Mesos modes. Consult the deployment-specific page for more information. @@ -769,6 +773,18 @@ The following options provides finer-grained control for this feature: application being run. + + spark.kerberos.access.hadoopFileSystems + (none) + + A comma-separated list of secure Hadoop filesystems your Spark application is going to access. For + example, spark.kerberos.access.hadoopFileSystems=hdfs://nn1.com:8032,hdfs://nn2.com:8032, + webhdfs://nn3.com:50070. The Spark application must have access to the filesystems listed + and Kerberos must be properly configured to be able to access them (either in the same realm + or in a trusted realm). Spark acquires security tokens for each of the filesystems so that + the Spark application can access those remote Hadoop filesystems. + + ## Long-Running Applications