Skip to content

Commit 8a4f268

Browse files
Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required.
1 parent e800c8b commit 8a4f268

File tree

7 files changed

+113
-108
lines changed

7 files changed

+113
-108
lines changed

core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,62 +17,61 @@
1717
package org.apache.spark.deploy
1818

1919
import java.util.concurrent.{Executors, TimeUnit}
20-
import java.util.{Comparator, Arrays}
2120

22-
import com.google.common.primitives.Longs
2321
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{PathFilter, FileStatus, Path, FileSystem}
22+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2523
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2624

27-
import org.apache.spark.util.Utils
2825
import org.apache.spark.{Logging, SparkConf}
26+
import org.apache.spark.util.Utils
2927

3028
private[spark] class ExecutorDelegationTokenUpdater(
3129
sparkConf: SparkConf,
3230
hadoopConf: Configuration) extends Logging {
3331

3432
@volatile private var lastCredentialsFileSuffix = 0
3533

36-
private lazy val delegationTokenRenewer =
34+
private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
35+
36+
private val delegationTokenRenewer =
3737
Executors.newSingleThreadScheduledExecutor(
3838
Utils.namedThreadFactory("Delegation Token Refresh Thread"))
3939

4040
// On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
41-
private lazy val executorUpdaterRunnable =
41+
private val executorUpdaterRunnable =
4242
new Runnable {
4343
override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
4444
}
4545

4646
def updateCredentialsIfRequired(): Unit = {
4747
try {
48-
sparkConf.getOption("spark.yarn.credentials.file").foreach { credentialsFile =>
49-
val credentials = UserGroupInformation.getCurrentUser.getCredentials
50-
val credentialsFilePath = new Path(credentialsFile)
51-
val remoteFs = FileSystem.get(hadoopConf)
52-
SparkHadoopUtil.get.listFilesSorted(
53-
remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, ".tmp")
54-
.lastOption.foreach { credentialsStatus =>
55-
val suffix = getSuffixForCredentialsPath(credentialsStatus)
56-
if (suffix > lastCredentialsFileSuffix) {
57-
logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
58-
val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
59-
lastCredentialsFileSuffix = suffix
60-
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
61-
val totalValidity = SparkHadoopUtil.get.getLatestTokenValidity(credentials) -
62-
credentialsStatus.getModificationTime
63-
val timeToRunRenewal =
64-
credentialsStatus.getModificationTime + (0.8 * totalValidity).toLong
65-
val timeFromNowToRenewal = timeToRunRenewal - System.currentTimeMillis()
66-
logInfo("Updated delegation tokens, will check for new tokens in " +
67-
timeFromNowToRenewal + " millis")
68-
delegationTokenRenewer.schedule(
69-
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
70-
} else {
71-
// Check every hour to see if new credentials arrived.
72-
logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
73-
"tokens yet, will check again in an hour.")
74-
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
75-
}
48+
val credentials = UserGroupInformation.getCurrentUser.getCredentials
49+
val credentialsFilePath = new Path(credentialsFile)
50+
val remoteFs = FileSystem.get(hadoopConf)
51+
SparkHadoopUtil.get.listFilesSorted(
52+
remoteFs, credentialsFilePath.getParent, credentialsFilePath.getName, ".tmp")
53+
.lastOption
54+
.foreach { credentialsStatus =>
55+
val suffix = getSuffixForCredentialsPath(credentialsStatus)
56+
if (suffix > lastCredentialsFileSuffix) {
57+
logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
58+
val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
59+
lastCredentialsFileSuffix = suffix
60+
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
61+
val totalValidity = SparkHadoopUtil.get.getLatestTokenValidity(credentials) -
62+
credentialsStatus.getModificationTime
63+
val timeToRunRenewal =
64+
credentialsStatus.getModificationTime + (0.8 * totalValidity).toLong
65+
val timeFromNowToRenewal = timeToRunRenewal - System.currentTimeMillis()
66+
logInfo("Updated delegation tokens, will check for new tokens in " +
67+
timeFromNowToRenewal + " millis")
68+
delegationTokenRenewer.schedule(
69+
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
70+
} else {
71+
// Check every hour to see if new credentials arrived.
72+
logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
73+
"tokens yet, will check again in an hour.")
74+
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
7675
}
7776
}
7877
} catch {

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.deploy
2020
import java.io.{ByteArrayInputStream, DataInputStream}
2121
import java.lang.reflect.Method
2222
import java.security.PrivilegedExceptionAction
23-
import java.util.{Comparator, Arrays}
23+
import java.util.{Arrays, Comparator}
2424

2525
import com.google.common.primitives.Longs
2626
import org.apache.hadoop.conf.Configuration
27-
import org.apache.hadoop.fs.{PathFilter, FileStatus, FileSystem, Path}
27+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
2828
import org.apache.hadoop.fs.FileSystem.Statistics
2929
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
3030
import org.apache.hadoop.mapred.JobConf
@@ -44,7 +44,7 @@ import scala.collection.JavaConversions._
4444
*/
4545
@DeveloperApi
4646
class SparkHadoopUtil extends Logging {
47-
val sparkConf = new SparkConf()
47+
private val sparkConf = new SparkConf()
4848
val conf: Configuration = newConfiguration(sparkConf)
4949
UserGroupInformation.setConfiguration(conf)
5050

@@ -209,12 +209,12 @@ class SparkHadoopUtil extends Logging {
209209

210210
/**
211211
* Lists all the files in a directory with the specified prefix, and does not end with the
212-
* given suffix.
212+
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
213+
* the respective files.
213214
* @param remoteFs
214215
* @param prefix
215216
* @return
216217
*/
217-
218218
def listFilesSorted(
219219
remoteFs: FileSystem,
220220
dir: Path,

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
501501
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
502502
| secure HDFS.
503503
| --keytab KEYTAB The full path to the file that contains the keytab for the
504-
| principal specified above.
504+
| principal specified above. This keytab will be copied to
505+
| the node running the Application Master via the Secure
506+
| Distributed Cache, for renewing the login tickets and the
507+
| delegation tokens periodically.
505508
""".stripMargin
506509
)
507510
SparkSubmit.exitFn()

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,14 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
174174
driverConf.set(key, value)
175175
}
176176
}
177-
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
178-
val tokenUpdater = new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf)
179-
tokenUpdater.updateCredentialsIfRequired()
177+
var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
178+
if(driverConf.contains("spark.yarn.credentials.file")) {
179+
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
180+
tokenUpdaterOption =
181+
Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))
182+
tokenUpdaterOption.get.updateCredentialsIfRequired()
183+
}
184+
180185
val env = SparkEnv.createExecutorEnv(
181186
driverConf, executorId, hostname, port, cores, isLocal = false)
182187

@@ -192,7 +197,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
192197
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
193198
}
194199
env.actorSystem.awaitTermination()
195-
tokenUpdater.stop()
200+
tokenUpdaterOption.foreach(_.stop())
196201
env.rpcEnv.awaitTermination()
197202
}
198203
}

docs/security.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ SSL must be configured on each node and configured for each component involved i
3131

3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
34+
For long-running apps like Spark Streaming apps be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master securely via the Hadoop Distributed Cache. The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
3435

3536
### Standalone mode
3637
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.

yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala

Lines changed: 50 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,14 @@
1616
*/
1717
package org.apache.spark.deploy.yarn
1818

19-
import java.io.{DataOutputStream, ByteArrayOutputStream}
19+
import java.io.{ByteArrayOutputStream, DataOutputStream}
2020
import java.nio.ByteBuffer
2121
import java.util.concurrent.{Executors, TimeUnit}
2222

23-
import akka.actor.ActorSelection
2423
import org.apache.hadoop.conf.Configuration
2524
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
2625
import org.apache.hadoop.security.UserGroupInformation
2726

28-
import org.apache.spark.deploy.SparkHadoopUtil
2927
import org.apache.spark.rpc.RpcEndpointRef
3028
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.NewTokens
3129
import org.apache.spark.{Logging, SparkConf}
@@ -50,18 +48,20 @@ import org.apache.spark.util.{SerializableBuffer, Utils}
5048
* appeared, it will read the credentials and update the currently running UGI with it. This
5149
* process happens again once 80% of the validity of this has expired.
5250
*/
53-
class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration) extends Logging {
51+
private[yarn] class AMDelegationTokenRenewer(
52+
sparkConf: SparkConf,
53+
hadoopConf: Configuration) extends Logging {
5454

5555
private var lastCredentialsFileSuffix = 0
5656

57-
private lazy val delegationTokenRenewer =
57+
private val delegationTokenRenewer =
5858
Executors.newSingleThreadScheduledExecutor(
5959
Utils.namedThreadFactory("Delegation Token Refresh Thread"))
6060

6161
private var loggedInViaKeytab = false
6262
var driverEndPoint: RpcEndpointRef = null
6363

64-
private lazy val hadoopUtil = YarnSparkHadoopUtil.get
64+
private val hadoopUtil = YarnSparkHadoopUtil.get
6565

6666
/**
6767
* Schedule a login from the keytab and principal set using the --principal and --keytab
@@ -70,62 +70,51 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
7070
* to do the login. This method is a no-op in non-YARN mode.
7171
*/
7272
private[spark] def scheduleLoginFromKeytab(): Unit = {
73-
sparkConf.getOption("spark.yarn.principal").foreach { principal =>
74-
val keytab = sparkConf.get("spark.yarn.keytab")
75-
76-
def getRenewalInterval: Long = {
77-
import scala.concurrent.duration._
78-
val credentials = UserGroupInformation.getCurrentUser.getCredentials
79-
val interval = (0.75 * (hadoopUtil.getLatestTokenValidity(credentials) -
80-
System.currentTimeMillis())).toLong
81-
// If only 6 hours left, then force a renewal immediately. This is to avoid tokens with
82-
// very less validity being used on AM restart.
83-
if ((interval millis).toHours <= 6) {
84-
0L
85-
} else {
86-
interval
87-
}
73+
val principal = sparkConf.get("spark.yarn.principal")
74+
val keytab = sparkConf.get("spark.yarn.keytab")
75+
76+
def getRenewalInterval: Long = {
77+
import scala.concurrent.duration._
78+
val credentials = UserGroupInformation.getCurrentUser.getCredentials
79+
val interval = (0.75 * (hadoopUtil.getLatestTokenValidity(credentials) -
80+
System.currentTimeMillis())).toLong
81+
// If only 6 hours left, then force a renewal immediately. This is to avoid tokens with
82+
// very less validity being used on AM restart.
83+
if ((interval millis).toHours <= 6) {
84+
0L
85+
} else {
86+
interval
8887
}
88+
}
8989

90-
def scheduleRenewal(runnable: Runnable): Unit = {
91-
val renewalInterval = getRenewalInterval
92-
logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
93-
delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
94-
}
90+
def scheduleRenewal(runnable: Runnable): Unit = {
91+
val renewalInterval = getRenewalInterval
92+
logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
93+
delegationTokenRenewer.schedule(runnable, renewalInterval, TimeUnit.MILLISECONDS)
94+
}
9595

96-
// This thread periodically runs on the driver to update the delegation tokens on HDFS.
97-
val driverTokenRenewerRunnable =
98-
new Runnable {
99-
override def run(): Unit = {
100-
var wroteNewFiles = false
101-
try {
102-
writeNewTokensToHDFS(principal, keytab)
103-
wroteNewFiles = true
104-
cleanupOldFiles()
105-
} catch {
106-
case e: Exception =>
107-
// If the exception was due to some issue deleting files, don't worry about it -
108-
// just try to clean up next time. Else, reschedule for an hour later so new
109-
// tokens get written out.
110-
if (!wroteNewFiles) {
111-
logWarning("Failed to write out new credentials to HDFS, will try again in an " +
112-
"hour! If this happens too often tasks will fail.", e)
113-
delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
114-
return
115-
} else {
116-
logWarning("Error while attempting to clean up old delegation token files. " +
117-
"Cleanup will be reattempted the next time new tokens are being written.")
118-
}
119-
}
120-
scheduleRenewal(this)
96+
// This thread periodically runs on the driver to update the delegation tokens on HDFS.
97+
val driverTokenRenewerRunnable =
98+
new Runnable {
99+
override def run(): Unit = {
100+
try {
101+
writeNewTokensToHDFS(principal, keytab)
102+
cleanupOldFiles()
103+
} catch {
104+
case e: Exception =>
105+
// Log the error and try to write new tokens back in an hour
106+
logWarning("Failed to write out new credentials to HDFS, will try again in an " +
107+
"hour! If this happens too often tasks will fail.", e)
108+
delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
109+
return
121110
}
111+
scheduleRenewal(this)
122112
}
123-
// Schedule update of credentials. This handles the case of updating the tokens right now
124-
// as well, since the renenwal interval will be 0, and the thread will get scheduled
125-
// immediately.
126-
scheduleRenewal(driverTokenRenewerRunnable)
127-
128-
}
113+
}
114+
// Schedule update of credentials. This handles the case of updating the tokens right now
115+
// as well, since the renenwal interval will be 0, and the thread will get scheduled
116+
// immediately.
117+
scheduleRenewal(driverTokenRenewerRunnable)
129118
}
130119

131120
// Keeps only files that are newer than 30 days, and deletes everything else. At least 5 files
@@ -135,9 +124,11 @@ class AMDelegationTokenRenewer(sparkConf: SparkConf, hadoopConf: Configuration)
135124
try {
136125
val remoteFs = FileSystem.get(hadoopConf)
137126
val credentialsPath = new Path(sparkConf.get("spark.yarn.credentials.file"))
127+
val thresholdTime = System.currentTimeMillis() - (30 days).toMillis
138128
hadoopUtil.listFilesSorted(
139-
remoteFs, credentialsPath.getParent, credentialsPath.getName, ".tmp").dropRight(5)
140-
.takeWhile(_.getModificationTime < System.currentTimeMillis() - (30 days).toMillis)
129+
remoteFs, credentialsPath.getParent, credentialsPath.getName, ".tmp")
130+
.dropRight(5)
131+
.takeWhile(_.getModificationTime < thresholdTime)
141132
.foreach(x => remoteFs.delete(x.getPath, true))
142133
} catch {
143134
// Such errors are not fatal, so don't throw. Make sure they are logged though

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[spark] class ApplicationMaster(
7676
// Fields used in cluster mode.
7777
private val sparkContextRef = new AtomicReference[SparkContext](null)
7878

79-
private lazy val delegationTokenRenewer = new AMDelegationTokenRenewer(sparkConf, yarnConf)
79+
private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
8080

8181
final def run(): Int = {
8282
try {
@@ -140,6 +140,12 @@ private[spark] class ApplicationMaster(
140140
// doAs in order for the credentials to be passed on to the executor containers.
141141
val securityMgr = new SecurityManager(sparkConf)
142142

143+
// If the credentials file config is present, we must periodically renew tokens. So create
144+
// a new AMDelegationTokenRenewer
145+
if(sparkConf.contains("spark.yarn.credentials.file")) {
146+
delegationTokenRenewerOption = Some(new AMDelegationTokenRenewer(sparkConf, yarnConf))
147+
}
148+
143149
if (isClusterMode) {
144150
runDriver(securityMgr)
145151
} else {
@@ -204,7 +210,7 @@ private[spark] class ApplicationMaster(
204210
logDebug("shutting down user thread")
205211
userClassThread.interrupt()
206212
}
207-
if (!inShutdown) delegationTokenRenewer.stop()
213+
if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
208214
}
209215
}
210216
}
@@ -254,7 +260,7 @@ private[spark] class ApplicationMaster(
254260
SparkEnv.driverActorSystemName,
255261
RpcAddress(host, port.toInt),
256262
YarnSchedulerBackend.ENDPOINT_NAME)
257-
delegationTokenRenewer.driverEndPoint = driverEndpoint
263+
delegationTokenRenewerOption.foreach(_.driverEndPoint = driverEndpoint)
258264
amEndpoint =
259265
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
260266
}
@@ -281,7 +287,7 @@ private[spark] class ApplicationMaster(
281287
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
282288
// If a principal and keytab have been set, use that to create new credentials for executors
283289
// periodically
284-
delegationTokenRenewer.scheduleLoginFromKeytab()
290+
delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
285291
userClassThread.join()
286292
}
287293
}
@@ -292,7 +298,7 @@ private[spark] class ApplicationMaster(
292298
addAmIpFilter()
293299
// If a principal and keytab have been set, use that to create new credentials for executors
294300
// periodically
295-
delegationTokenRenewer.scheduleLoginFromKeytab()
301+
delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
296302
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
297303

298304
// In client mode the actor will stop the reporter thread.

0 commit comments

Comments
 (0)