Skip to content

Commit 42eead4

Browse files
Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens.
1 parent ebb36f5 commit 42eead4

File tree

9 files changed

+128
-126
lines changed

9 files changed

+128
-126
lines changed

core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2525
import org.apache.spark.{Logging, SparkConf}
2626
import org.apache.spark.util.Utils
2727

28+
import scala.util.control.NonFatal
29+
2830
private[spark] class ExecutorDelegationTokenUpdater(
2931
sparkConf: SparkConf,
3032
hadoopConf: Configuration) extends Logging {
@@ -45,39 +47,41 @@ private[spark] class ExecutorDelegationTokenUpdater(
4547

4648
def updateCredentialsIfRequired(): Unit = {
4749
try {
48-
val credentials = UserGroupInformation.getCurrentUser.getCredentials
4950
val credentialsFilePath = new Path(credentialsFile)
5051
val remoteFs = FileSystem.get(hadoopConf)
5152
SparkHadoopUtil.get.listFilesSorted(
52-
remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, ".tmp")
53-
.lastOption
54-
.foreach { credentialsStatus =>
55-
val suffix = getSuffixForCredentialsPath(credentialsStatus)
53+
remoteFs, credentialsFilePath.getParent,
54+
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
55+
.lastOption.foreach { credentialsStatus =>
56+
val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
5657
if (suffix > lastCredentialsFileSuffix) {
5758
logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
5859
val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
5960
lastCredentialsFileSuffix = suffix
6061
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
61-
val totalValidity = SparkHadoopUtil.get.getLatestTokenValidity(credentials) -
62-
credentialsStatus.getModificationTime
63-
val timeToRunRenewal =
64-
credentialsStatus.getModificationTime + (0.8 * totalValidity).toLong
65-
val timeFromNowToRenewal = timeToRunRenewal - System.currentTimeMillis()
66-
logInfo("Updated delegation tokens, will check for new tokens in " +
67-
timeFromNowToRenewal + " millis")
68-
delegationTokenRenewer.schedule(
69-
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
62+
logInfo("Tokens updated from credentials file.")
7063
} else {
7164
// Check every hour to see if new credentials arrived.
7265
logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
7366
"tokens yet, will check again in an hour.")
7467
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
68+
return
7569
}
7670
}
71+
val timeFromNowToRenewal =
72+
SparkHadoopUtil.get.getTimeFromNowToRenewal(
73+
0.8, UserGroupInformation.getCurrentUser.getCredentials)
74+
if (timeFromNowToRenewal <= 0) {
75+
executorUpdaterRunnable.run()
76+
} else {
77+
logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
78+
delegationTokenRenewer.schedule(
79+
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
80+
}
7781
} catch {
7882
// Since the file may get deleted while we are reading it, catch the Exception and come
7983
// back in an hour to try again
80-
case e: Exception =>
84+
case NonFatal(e) =>
8185
logWarning("Error while trying to update credentials, will try again in 1 hour", e)
8286
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
8387
}
@@ -87,7 +91,7 @@ private[spark] class ExecutorDelegationTokenUpdater(
8791
val stream = remoteFs.open(tokenPath)
8892
try {
8993
val newCredentials = new Credentials()
90-
newCredentials.readFields(stream)
94+
newCredentials.readTokenStorageStream(stream)
9195
newCredentials
9296
} finally {
9397
stream.close()
@@ -98,8 +102,4 @@ private[spark] class ExecutorDelegationTokenUpdater(
98102
delegationTokenRenewer.shutdown()
99103
}
100104

101-
private def getSuffixForCredentialsPath(credentialsStatus: FileStatus): Int = {
102-
val fileName = credentialsStatus.getPath.getName
103-
fileName.substring(fileName.lastIndexOf("-") + 1).toInt
104-
}
105105
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.annotation.DeveloperApi
3636
import org.apache.spark.util.Utils
3737

3838
import scala.collection.JavaConversions._
39+
import scala.concurrent.duration._
3940

4041
/**
4142
* :: DeveloperApi ::
@@ -46,6 +47,8 @@ class SparkHadoopUtil extends Logging {
4647
private val sparkConf = new SparkConf()
4748
val conf: Configuration = newConfiguration(sparkConf)
4849
UserGroupInformation.setConfiguration(conf)
50+
private lazy val renewalInterval =
51+
conf.getLong("dfs.namenode.delegation.token.renew-interval", (24 hours).toMillis)
4952

5053
/**
5154
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
@@ -235,19 +238,28 @@ class SparkHadoopUtil extends Logging {
235238
}
236239

237240
/**
238-
* Get the latest validity of the HDFS token in the Credentials object.
239-
* @param credentials
240-
* @return
241+
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
242+
* is valid the latest)?
243+
* This will return -ve (or 0) value if the fraction of validity has already expired.
241244
*/
242-
def getLatestTokenValidity(credentials: Credentials): Long = {
245+
def getTimeFromNowToRenewal(fraction: Double, credentials: Credentials): Long = {
246+
val now = System.currentTimeMillis()
243247
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
244248
.map { t =>
245249
val identifier = new DelegationTokenIdentifier()
246250
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
247-
identifier.getMaxDate
251+
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
248252
}.foldLeft(0L)(math.max)
249253
}
250254

255+
256+
private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
257+
val fileName = credentialsPath.getName
258+
fileName.substring(
259+
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
260+
}
261+
262+
251263
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
252264

253265
/**
@@ -298,6 +310,10 @@ object SparkHadoopUtil {
298310
}
299311
}
300312

313+
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
314+
315+
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
316+
301317
def get: SparkHadoopUtil = {
302318
hadoop
303319
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5555
logInfo("Connecting to driver: " + driverUrl)
5656
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
5757
driver = Some(ref)
58-
ref.sendWithReply[RegisteredExecutor](
58+
ref.sendWithReply[RegisteredExecutor.type](
5959
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
6060
} onComplete {
6161
case Success(msg) => Utils.tryLogNonFatalError {
@@ -72,17 +72,10 @@ private[spark] class CoarseGrainedExecutorBackend(
7272
}
7373

7474
override def receive: PartialFunction[Any, Unit] = {
75-
case RegisteredExecutor(tokens) =>
75+
case RegisteredExecutor=>
7676
logInfo("Successfully registered with driver")
7777
val (hostname, _) = Utils.parseHostPort(hostPort)
7878
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
79-
tokens.foreach { tokenBuffer =>
80-
val inStream = new DataInputStream(new ByteArrayInputStream(tokenBuffer.value.array()))
81-
val creds = new Credentials()
82-
creds.readFields(inStream)
83-
inStream.close()
84-
UserGroupInformation.getCurrentUser.addCredentials(creds)
85-
}
8679

8780
case RegisterExecutorFailed(message) =>
8881
logError("Slave registration failed: " + message)
@@ -175,7 +168,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
175168
}
176169
}
177170
var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
178-
if(driverConf.contains("spark.yarn.credentials.file")) {
171+
if (driverConf.contains("spark.yarn.credentials.file")) {
172+
logInfo("Will periodically update credentials from: " +
173+
driverConf.get("spark.yarn.credentials.file"))
179174
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
180175
tokenUpdaterOption =
181176
Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ private[spark] object CoarseGrainedClusterMessages {
3535
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3636
extends CoarseGrainedClusterMessage
3737

38-
case class RegisteredExecutor(tokens: Option[SerializableBuffer])
39-
extends CoarseGrainedClusterMessage
40-
41-
case class NewTokens(tokens: SerializableBuffer) extends CoarseGrainedClusterMessage
38+
case object RegisteredExecutor extends CoarseGrainedClusterMessage
4239

4340
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
4441

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6666
// Executors we have requested the cluster manager to kill that have not died yet
6767
private val executorsPendingToRemove = new HashSet[String]
6868

69-
private var latestTokens: Option[SerializableBuffer] = None
7069
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7170
extends ThreadSafeRpcEndpoint with Logging {
7271

@@ -115,7 +114,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
115114
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
116115
}
117116

118-
case NewTokens(tokens) => latestTokens = Option(tokens)
119117
}
120118

121119
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -125,7 +123,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
125123
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
126124
} else {
127125
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
128-
context.reply(RegisteredExecutor(latestTokens))
126+
context.reply(RegisteredExecutor)
129127
addressToExecutorId(executorRef.address) = executorId
130128
totalCoreCount.addAndGet(cores)
131129
totalRegisteredExecutors.addAndGet(1)

docs/security.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ SSL must be configured on each node and configured for each component involved i
3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
3434

35-
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
35+
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS. Please note that the HDFS client configuration file, `hdfs-site.xml` on each executor node must have the value of `dfs.namenode.delegation.token.renew-interval` be the same as it is on the HDFS Namenode for this functionality.
3636

3737
### Standalone mode
3838
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.

0 commit comments

Comments
 (0)