Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import com.google.common.base.Objects
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.{DataOutputBuffer, Text}
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringUtils
import org.apache.hadoop.util.VersionInfo
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords._
Expand All @@ -60,7 +61,7 @@ import org.apache.spark.internal.config.Python._
import org.apache.spark.launcher.{JavaModuleOptions, LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.util.{CallerContext, Utils, YarnContainerInfoHelper}
import org.apache.spark.util.{CallerContext, Utils, VersionUtils, YarnContainerInfoHelper}

private[spark] class Client(
val args: ClientArguments,
Expand All @@ -76,6 +77,10 @@ private[spark] class Client(

private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"

// ContainerLaunchContext.setTokensConf is only available in Hadoop 2.9+ and 3.x, so here we use
// reflection to avoid compilation for Hadoop 2.7 profile.
private val SET_TOKENS_CONF_METHOD = "setTokensConf"

private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
private var appMaster: ApplicationMaster = _
private var stagingDirPath: Path = _
Expand Down Expand Up @@ -340,6 +345,45 @@ private[spark] class Client(
amContainer.setTokens(ByteBuffer.wrap(serializedCreds))
}

/**
* Set configurations sent from AM to RM for renewing delegation tokens.
*/
private def setTokenConf(amContainer: ContainerLaunchContext): Unit = {
// SPARK-37205: this regex is used to grep a list of configurations and send them to YARN RM
// for fetching delegation tokens. See YARN-5910 for more details.
val regex = sparkConf.get(config.AM_TOKEN_CONF_REGEX)
// The feature is only supported in Hadoop 2.9+ and 3.x, hence the check below.
val isSupported = VersionUtils.majorMinorVersion(VersionInfo.getVersion) match {
case (2, n) if n >= 9 => true
case (3, _) => true
case _ => false
}
if (regex.nonEmpty && isSupported) {
logInfo(s"Processing token conf (spark.yarn.am.tokenConfRegex) with regex $regex")
val dob = new DataOutputBuffer();
val copy = new Configuration(false);
copy.clear();
hadoopConf.asScala.foreach { entry =>
if (entry.getKey.matches(regex.get)) {
copy.set(entry.getKey, entry.getValue)
logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}")
}
}
copy.write(dob);

// since this method was added in Hadoop 2.9 and 3.0, we use reflection here to avoid
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing this only for 3.x ? If not, relax the isHadoop3 condition ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since Spark only supports built-in Hadoop 2.7 or 3.3, we have the check here. Do you mean support custom Hadoop version 2.9+ too with -Phadoop.version=2.9.x?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly - both 2.9 and 2.10 for example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I added the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gently ping @mridulm . Does the latest change look good to you?

// compilation error for Hadoop 2.7 profile.
val setTokensConfMethod = try {
amContainer.getClass.getMethod(SET_TOKENS_CONF_METHOD, classOf[ByteBuffer])
} catch {
case _: NoSuchMethodException =>
throw new SparkException(s"Cannot find setTokensConf method in ${amContainer.getClass}." +
s" Please check YARN version and make sure it is 2.9+ or 3.x")
}
setTokensConfMethod.invoke(ByteBuffer.wrap(dob.getData))
}
}

/** Get the application report from the ResourceManager for an application we have submitted. */
def getApplicationReport(appId: ApplicationId): ApplicationReport =
yarnClient.getApplicationReport(appId)
Expand Down Expand Up @@ -1084,6 +1128,7 @@ private[spark] class Client(
amContainer.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
setupSecurityToken(amContainer)
setTokenConf(amContainer)
amContainer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ package object config extends Logging {
.booleanConf
.createWithDefault(false)

private[spark] val AM_TOKEN_CONF_REGEX =
ConfigBuilder("spark.yarn.am.tokenConfRegex")
.doc("This config is only supported when Hadoop version is 2.9+ or 3.x (e.g., when using " +
"the Hadoop 3.x profile). The value of this config is a regex expression used to grep a " +
"list of config entries from the job's configuration file (e.g., hdfs-site.xml) and send " +
"to RM, which uses them when renewing delegation tokens. A typical use case of this " +
"feature is to support delegation tokens in an environment where a YARN cluster needs to " +
"talk to multiple downstream HDFS clusters, where the YARN RM may not have configs " +
"(e.g., dfs.nameservices, dfs.ha.namenodes.*, dfs.namenode.rpc-address.*) to connect to " +
"these clusters. In this scenario, Spark users can specify the config value to be " +
"'^dfs.nameservices$|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$' to parse " +
"these HDFS configs from the job's local configuration files. This config is very " +
"similar to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.")
.version("3.3.0")
.stringConf
.createOptional

private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +
Expand Down