Skip to content

Commit 09fe224

Browse files
Use token.renew to get token's renewal interval rather than using hdfs-site.xml
1 parent 6963bbc commit 09fe224

File tree

8 files changed

+42
-31
lines changed

8 files changed

+42
-31
lines changed

core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
7070
}
7171
val timeFromNowToRenewal =
7272
SparkHadoopUtil.get.getTimeFromNowToRenewal(
73-
0.8, UserGroupInformation.getCurrentUser.getCredentials)
73+
sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
7474
if (timeFromNowToRenewal <= 0) {
7575
executorUpdaterRunnable.run()
7676
} else {

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
3131
import org.apache.hadoop.mapreduce.JobContext
3232
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
3333

34-
import org.apache.spark._
34+
import org.apache.spark.{Logging, SparkConf, SparkException}
3535
import org.apache.spark.annotation.DeveloperApi
3636
import org.apache.spark.util.Utils
3737

@@ -47,8 +47,6 @@ class SparkHadoopUtil extends Logging {
4747
private val sparkConf = new SparkConf()
4848
val conf: Configuration = newConfiguration(sparkConf)
4949
UserGroupInformation.setConfiguration(conf)
50-
private lazy val renewalInterval =
51-
conf.getLong("dfs.namenode.delegation.token.renew-interval", (24 hours).toMillis)
5250

5351
/**
5452
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
@@ -213,9 +211,6 @@ class SparkHadoopUtil extends Logging {
213211
* Lists all the files in a directory with the specified prefix, and does not end with the
214212
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
215213
* the respective files.
216-
* @param remoteFs
217-
* @param prefix
218-
* @return
219214
*/
220215
def listFilesSorted(
221216
remoteFs: FileSystem,
@@ -242,8 +237,12 @@ class SparkHadoopUtil extends Logging {
242237
* is valid the latest)?
243238
* This will return -ve (or 0) value if the fraction of validity has already expired.
244239
*/
245-
def getTimeFromNowToRenewal(fraction: Double, credentials: Credentials): Long = {
240+
def getTimeFromNowToRenewal(
241+
sparkConf: SparkConf,
242+
fraction: Double,
243+
credentials: Credentials): Long = {
246244
val now = System.currentTimeMillis()
245+
val renewalInterval = sparkConf.getLong("spark.yarn.renewal.interval", (24 hours).toMillis)
247246
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
248247
.map { t =>
249248
val identifier = new DelegationTokenIdentifier()

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717

1818
package org.apache.spark.executor
1919

20-
import java.io.{ByteArrayInputStream, DataInputStream}
2120
import java.net.URL
2221
import java.nio.ByteBuffer
2322

2423
import scala.collection.mutable
2524
import scala.util.{Failure, Success}
2625

27-
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
28-
2926
import org.apache.spark.rpc._
3027
import org.apache.spark._
3128
import org.apache.spark.TaskState.TaskState
@@ -77,7 +74,7 @@ private[spark] class CoarseGrainedExecutorBackend(
7774
}
7875

7976
override def receive: PartialFunction[Any, Unit] = {
80-
case RegisteredExecutor=>
77+
case RegisteredExecutor =>
8178
logInfo("Successfully registered with driver")
8279
val (hostname, _) = Utils.parseHostPort(hostPort)
8380
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
@@ -195,7 +192,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
195192
workerUrl.foreach { url =>
196193
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
197194
}
198-
env.actorSystem.awaitTermination()
199195
tokenUpdaterOption.foreach(_.stop())
200196
env.rpcEnv.awaitTermination()
201197
}

docs/security.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ SSL must be configured on each node and configured for each component involved i
3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
3434

35-
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. Please note that the HDFS client configuration file, `hdfs-site.xml` on each executor node must have the value of `dfs.namenode.delegation.token.renew-interval` be the same as it is on the HDFS Namenode for this functionality.
35+
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
3636

3737
### Standalone mode
3838
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.

yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ import org.apache.spark.util.ThreadUtils
3232
* Streaming apps can run without interruption while writing to secure HDFS. The
3333
* scheduleLoginFromKeytab method is called on the driver when the
3434
* CoarseGrainedScheduledBackend starts up. This method wakes up a thread that logs into the KDC
35-
* once 75% of the expiry time of the original delegation tokens used for the container
35+
* once 75% of the renewal interval of the original delegation tokens used for the container
3636
* has elapsed. It then creates new delegation tokens and writes them to HDFS in a
3737
* pre-specified location - the prefix of which is specified in the sparkConf by
3838
* spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - each update goes
3939
* to a new file, with a monotonically increasing suffix). After this, the credentials are
40-
* updated once 75% of the new tokens validity has elapsed.
40+
* updated once 75% of the new tokens renewal interval has elapsed.
4141
*
4242
* On the executor side, the updateCredentialsIfRequired method is called once 80% of the
4343
* validity of the original tokens has elapsed. At that time the executor finds the
@@ -72,13 +72,12 @@ private[yarn] class AMDelegationTokenRenewer(
7272
val keytab = sparkConf.get("spark.yarn.keytab")
7373

7474
/**
75-
* Schedule the renewal of the tokens. If tokens have already expired, this method will
76-
* synchronously renew them.
77-
* @param runnable
75+
* Schedule re-login and creation of new tokens. If tokens have already expired, this method
76+
* will synchronously create new ones.
7877
*/
7978
def scheduleRenewal(runnable: Runnable): Unit = {
8079
val credentials = UserGroupInformation.getCurrentUser.getCredentials
81-
val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(0.75, credentials)
80+
val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 0.75, credentials)
8281
// Run now!
8382
if (renewalInterval <= 0) {
8483
logInfo("HDFS tokens have expired, creating new tokens now.")
@@ -164,7 +163,7 @@ private[yarn] class AMDelegationTokenRenewer(
164163
// Get a copy of the credentials
165164
override def run(): Void = {
166165
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
167-
hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds, replaceExisting = true)
166+
hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
168167
null
169168
}
170169
})

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,6 @@ private[spark] class ApplicationMaster(
283283
rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, 0, sparkConf, securityMgr)
284284
waitForSparkDriver()
285285
addAmIpFilter()
286-
// If a principal and keytab have been set, use that to create new credentials for executors
287-
// periodically
288-
delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
289286
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
290287

291288
// In client mode the actor will stop the reporter thread.

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,10 @@
1717

1818
package org.apache.spark.deploy.yarn
1919

20-
import java.io.{File, FileOutputStream}
20+
import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
2121
import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
2222
import java.nio.ByteBuffer
23+
import java.security.PrivilegedExceptionAction
2324
import java.util.UUID
2425
import java.util.zip.{ZipEntry, ZipOutputStream}
2526

@@ -402,6 +403,27 @@ private[spark] class Client(
402403
}
403404
}
404405

406+
/**
407+
* Get the renewal interval for tokens.
408+
*/
409+
private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
410+
// We cannot use the tokens generated above since those have renewer yarn. Trying to renew
411+
// those will fail with an access control issue. So create new tokens with the logged in
412+
// user as renewer.
413+
val creds = new Credentials()
414+
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(Set(stagingDirPath), hadoopConf, creds,
415+
Some(sparkConf.get("spark.yarn.principal")))
416+
val t = creds.getAllTokens
417+
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
418+
.head
419+
val newExpiration = t.renew(hadoopConf)
420+
val identifier = new DelegationTokenIdentifier()
421+
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
422+
val interval = newExpiration - identifier.getIssueDate
423+
logInfo(s"Renewal Interval set to $interval")
424+
interval
425+
}
426+
405427
/**
406428
* Set up the environment for launching our ApplicationMaster container.
407429
*/
@@ -420,8 +442,9 @@ private[spark] class Client(
420442
sparkConf.set(
421443
"spark.yarn.credentials.file", new Path(stagingDirPath, credentialsFile).toString)
422444
logInfo(s"Credentials file set to: $credentialsFile")
445+
val renewalInterval = getTokenRenewalInterval(stagingDirPath)
446+
sparkConf.set("spark.yarn.renewal.interval", renewalInterval.toString)
423447
}
424-
425448
// Set the environment variables to be passed on to the executors.
426449
distCacheMgr.setDistFilesEnv(env)
427450
distCacheMgr.setDistArchivesEnv(env)

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ import java.io._
2121
import java.util.regex.Matcher
2222
import java.util.regex.Pattern
2323

24-
import org.apache.hadoop.security.token.TokenIdentifier
25-
26-
import scala.collection.JavaConversions._
2724
import scala.collection.mutable.HashMap
2825
import scala.util.Try
2926

@@ -116,10 +113,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
116113
paths: Set[Path],
117114
conf: Configuration,
118115
creds: Credentials,
119-
replaceExisting: Boolean = false
116+
renewer: Option[String] = None
120117
): Unit = {
121118
if (UserGroupInformation.isSecurityEnabled()) {
122-
val delegTokenRenewer = getTokenRenewer(conf)
119+
val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
123120
paths.foreach { dst =>
124121
val dstFs = dst.getFileSystem(conf)
125122
logInfo("getting token for namenode: " + dst)

0 commit comments

Comments
 (0)