Skip to content

Commit 37c430c

Browse files
committed
address comments
1 parent 8c6e5b8 commit 37c430c

File tree

2 files changed

+28
-14
lines changed

2 files changed

+28
-14
lines changed

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ private[spark] class Client(
7676

7777
private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
7878

79+
// ContainerLaunchContext.setTokensConf is only available in Hadoop 2.9+ and 3.x, so here we use
80+
// reflection to avoid compilation for Hadoop 2.7 profile.
81+
private val SET_TOKENS_CONF_METHOD = "setTokensConf"
82+
7983
private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
8084
private var appMaster: ApplicationMaster = _
8185
private var stagingDirPath: Path = _
@@ -348,19 +352,29 @@ private[spark] class Client(
348352
// for fetching delegation tokens. See YARN-5910 for more details.
349353
// The feature is only supported in Hadoop 3.x and up, hence the check below.
350354
val regex = sparkConf.get(config.AM_SEND_TOKEN_CONF)
351-
if (regex != null && regex.nonEmpty && VersionUtils.isHadoop3) {
355+
if (regex.nonEmpty && VersionUtils.isHadoop3) {
352356
logInfo(s"Processing token conf (spark.yarn.am.sendTokenConf) with regex $regex")
353357
val dob = new DataOutputBuffer();
354358
val copy = new Configuration(false);
355359
copy.clear();
356360
hadoopConf.asScala.foreach { entry =>
357-
if (entry.getKey.matches(regex)) {
361+
if (entry.getKey.matches(regex.get)) {
358362
copy.set(entry.getKey, entry.getValue)
359363
logInfo(s"Captured key: ${entry.getKey} -> value: ${entry.getValue}")
360364
}
361365
}
362366
copy.write(dob);
363-
amContainer.setTokensConf(ByteBuffer.wrap(dob.getData))
367+
368+
// since this method was added in Hadoop 2.9 and 3.0, we use reflection here to avoid
369+
// compilation error for Hadoop 2.7 profile.
370+
val setTokensConfMethod = try {
371+
amContainer.getClass.getMethod(SET_TOKENS_CONF_METHOD, classOf[ByteBuffer])
372+
} catch {
373+
case _: NoSuchMethodException =>
374+
throw new SparkException(s"Cannot find setTokensConf method in ${amContainer.getClass}." +
375+
s" Please check YARN version and make sure it is 2.9+ or 3.x")
376+
}
377+
setTokensConfMethod.invoke(ByteBuffer.wrap(dob.getData))
364378
}
365379
}
366380

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,17 @@ package object config extends Logging {
8080

8181
private[spark] val AM_SEND_TOKEN_CONF =
8282
ConfigBuilder("spark.yarn.am.sendTokenConf")
83-
.doc("The value of this config is a regex expression used to grep a list of " +
84-
"config entries from the job's configuration file (e.g., hdfs-site.xml) and send to " +
85-
"RM, which uses them when renewing delegation tokens. A typical use case of " +
86-
"this feature is to support delegation tokens in an environment where a YARN cluster " +
87-
"needs to talk to multiple downstream HDFS clusters, where the YARN RM may not have " +
88-
"configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, dfs.namenode.rpc-address.*)" +
89-
"to connect to these clusters. This config is very similar to " +
90-
"'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.")
91-
.version("3.3.0")
92-
.stringConf
93-
.createWithDefault("")
83+
.doc("This config is only supported for Hadoop 3.x profile. The value of this config is a " +
84+
"regex expression used to grep a list of config entries from the job's configuration " +
85+
"file (e.g., hdfs-site.xml) and send to RM, which uses them when renewing delegation " +
86+
"tokens. A typical use case of this feature is to support delegation tokens in an " +
87+
"environment where a YARN cluster needs to talk to multiple downstream HDFS clusters, " +
88+
"where the YARN RM may not have configs (e.g., dfs.nameservices, dfs.ha.namenodes.*, " +
89+
"dfs.namenode.rpc-address.*) to connect to these clusters. This config is very similar " +
90+
"to 'mapreduce.job.send-token-conf'. Please check YARN-5910 for more details.")
91+
.version("3.3.0")
92+
.stringConf
93+
.createOptional
9494

9595
private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
9696
ConfigBuilder("spark.yarn.executor.failuresValidityInterval")

0 commit comments

Comments
 (0)