diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ed54044a13ff..ce44ed6a85ae 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -34,10 +34,11 @@ import com.google.common.base.Objects import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission -import org.apache.hadoop.io.Text +import org.apache.hadoop.io.{DataOutputBuffer, Text} import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringUtils +import org.apache.hadoop.util.VersionInfo import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords._ @@ -60,7 +61,7 @@ import org.apache.spark.internal.config.Python._ import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.RpcEnv -import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper} +import org.apache.spark.util.{CallerContext, Utils, VersionUtils, YarnContainerInfoHelper} private[spark] class Client( val args: ClientArguments, @@ -76,6 +77,10 @@ private[spark] class Client( private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster" + // ContainerLaunchContext.setTokensConf is only available in Hadoop 2.9+ and 3.x, so here we use + // reflection to avoid compilation for Hadoop 2.7 profile. + private val SET_TOKENS_CONF_METHOD = "setTokensConf" + private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _ @@ -340,6 +345,45 @@ private[spark] class Client( amContainer.setTokens(ByteBuffer.wrap(serializedCreds)) } + /** + * Set configurations sent from AM to RM for renewing delegation tokens. + */ + private def setTokenConf(amContainer: ContainerLaunchContext): Unit = { + // SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM + // for fetching delegation tokens. See YARN-5910 for more details. + val regex = sparkConf.get(config.AM_TOKEN_CONF_REGEX) + // The feature is only supported in Hadoop 2.9+ and 3.x, hence the check below. + val isSupported = VersionUtils.majorMinorVersion(VersionInfo.getVersion) match { + case (2, n) if n >= 9 => true + case (3, _) => true + case _ => false + } + if (regex.nonEmpty && isSupported) { + logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with regex $regex") + val dob = new DataOutputBuffer(); + val copy = new Configuration(false); + copy.clear(); + hadoopConf.asScala.foreach { entry => + if (entry.getKey.matches(regex.get)) { + copy.set(entry.getKey, entry.getValue) + logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}") + } + } + copy.write(dob); + + // since this method was added in Hadoop 2.9 and 3.0, we use reflection here to avoid + // compilation error for Hadoop 2.7 profile. + val setTokensConfMethod = try { + amContainer.getClass.getMethod(SET_TOKENS_CONF_METHOD, classOf[ByteBuffer]) + } catch { + case _: NoSuchMethodException => + throw new SparkException(s"Cannot find setTokensConf method in ${amContainer.getClass}." + + s" Please check YARN version and make sure it is 2.9+ or 3.x") + } + setTokensConfMethod.invoke(ByteBuffer.wrap(dob.getData)) + } + } + /** Get the application report from the ResourceManager for an application we have submitted. */ def getApplicationReport(appId: ApplicationId): ApplicationReport = yarnClient.getApplicationReport(appId) @@ -1084,6 +1128,7 @@ private[spark] class Client( amContainer.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava) setupSecurityToken(amContainer) + setTokenConf(amContainer) amContainer } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 1270f1e4a666..838c42d8e55e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -78,6 +78,23 @@ package object config extends Logging { .booleanConf .createWithDefault(false) + private[spark] val AM_TOKEN_CONF_REGEX = + ConfigBuilder("spark.yarn.am.tokenConfRegex") + .doc("This config is only supported when Hadoop version is 2.9+ or 3.x (e.g., when using " + + "the Hadoop 3.x profile). The value of this config is a regex expression used to grep a " + + "list of config entries from the job's configuration file (e.g., hdfs-site.xml) and send " + + "to RM, which uses them when renewing delegation tokens. A typical use case of this " + + "feature is to support delegation tokens in an environment where a YARN cluster needs to " + + "talk to multiple downstream HDFS clusters, where the YARN RM may not have configs " + + "(e.g., dfs.nameservices, dfs.ha.namenodes.*, dfs.namenode.rpc-address.*) to connect to " + + "these clusters. In this scenario, Spark users can specify the config value to be " + + "'^dfs.nameservices$|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$' to parse " + + "these HDFS configs from the job's local configuration files. This config is very " + + "similar to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.") + .version("3.3.0") + .stringConf + .createOptional + private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.executor.failuresValidityInterval") .doc("Interval after which Executor failures will be considered independent and not " +