Skip to content

Commit b1f4ca8

Browse files
harishreedharantgravescs
authored andcommitted
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS
Take 2. Does the same thing as apache#4688, but fixes Hadoop-1 build. Author: Hari Shreedharan <[email protected]> Closes apache#5823 from harishreedharan/kerberos-longrunning and squashes the following commits: 3c86bba [Hari Shreedharan] Import fixes. Import postfixOps explicitly. 4d04301 [Hari Shreedharan] Minor formatting fixes. b5e7a72 [Hari Shreedharan] Remove reflection, use a method in SparkHadoopUtil to update the token renewer. 7bff6e9 [Hari Shreedharan] Make sure all required classes are present in the jar. Fix import order. e851f70 [Hari Shreedharan] Move the ExecutorDelegationTokenRenewer to yarn module. Use reflection to use it. 36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments. 611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens. 09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml 6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to. 072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils. f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens. ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates. 7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required. e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message. 0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 7f1bc58 [Hari Shreedharan] Minor fixes, cleanup. bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup. f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files. 2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted. 61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS. 62c45ce [Hari Shreedharan] Relogin from keytab periodically. fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues. 42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master. 0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity. 9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes. f4fd711 [Hari Shreedharan] Fix SparkConf usage. 2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS. af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required. f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file. f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials. 5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues. b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS 0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire. d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens() 8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object. fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start() 41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests. bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil. f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled. 2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS. ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos 77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
1 parent 4dc8d74 commit b1f4ca8

File tree

16 files changed

+657
-109
lines changed

16 files changed

+657
-109
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.io.{ByteArrayInputStream, DataInputStream}
2021
import java.lang.reflect.Method
2122
import java.security.PrivilegedExceptionAction
23+
import java.util.{Arrays, Comparator}
2224

25+
import com.google.common.primitives.Longs
2326
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
27+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
2528
import org.apache.hadoop.fs.FileSystem.Statistics
29+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2630
import org.apache.hadoop.mapred.JobConf
2731
import org.apache.hadoop.mapreduce.JobContext
2832
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -32,14 +36,17 @@ import org.apache.spark.annotation.DeveloperApi
3236
import org.apache.spark.util.Utils
3337

3438
import scala.collection.JavaConversions._
39+
import scala.concurrent.duration._
40+
import scala.language.postfixOps
3541

3642
/**
3743
* :: DeveloperApi ::
3844
* Contains util methods to interact with Hadoop from Spark.
3945
*/
4046
@DeveloperApi
4147
class SparkHadoopUtil extends Logging {
42-
val conf: Configuration = newConfiguration(new SparkConf())
48+
private val sparkConf = new SparkConf()
49+
val conf: Configuration = newConfiguration(sparkConf)
4350
UserGroupInformation.setConfiguration(conf)
4451

4552
/**
@@ -201,6 +208,61 @@ class SparkHadoopUtil extends Logging {
201208
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
202209
}
203210

211+
/**
212+
* Lists all the files in a directory with the specified prefix, and does not end with the
213+
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
214+
* the respective files.
215+
*/
216+
def listFilesSorted(
217+
remoteFs: FileSystem,
218+
dir: Path,
219+
prefix: String,
220+
exclusionSuffix: String): Array[FileStatus] = {
221+
val fileStatuses = remoteFs.listStatus(dir,
222+
new PathFilter {
223+
override def accept(path: Path): Boolean = {
224+
val name = path.getName
225+
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
226+
}
227+
})
228+
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
229+
override def compare(o1: FileStatus, o2: FileStatus): Int = {
230+
Longs.compare(o1.getModificationTime, o2.getModificationTime)
231+
}
232+
})
233+
fileStatuses
234+
}
235+
236+
/**
237+
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
238+
* is valid the latest)?
239+
* This will return -ve (or 0) value if the fraction of validity has already expired.
240+
*/
241+
def getTimeFromNowToRenewal(
242+
sparkConf: SparkConf,
243+
fraction: Double,
244+
credentials: Credentials): Long = {
245+
val now = System.currentTimeMillis()
246+
247+
val renewalInterval =
248+
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
249+
250+
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
251+
.map { t =>
252+
val identifier = new DelegationTokenIdentifier()
253+
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
254+
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
255+
}.foldLeft(0L)(math.max)
256+
}
257+
258+
259+
private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
260+
val fileName = credentialsPath.getName
261+
fileName.substring(
262+
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
263+
}
264+
265+
204266
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
205267

206268
/**
@@ -231,6 +293,17 @@ class SparkHadoopUtil extends Logging {
231293
}
232294
}
233295
}
296+
297+
/**
298+
* Start a thread to periodically update the current user's credentials with new delegation
299+
* tokens so that writes to HDFS do not fail.
300+
*/
301+
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}
302+
303+
/**
304+
* Stop the thread that does the delegation token updates.
305+
*/
306+
private[spark] def stopExecutorDelegationTokenRenewer() {}
234307
}
235308

236309
object SparkHadoopUtil {
@@ -251,6 +324,10 @@ object SparkHadoopUtil {
251324
}
252325
}
253326

327+
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
328+
329+
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
330+
254331
def get: SparkHadoopUtil = {
255332
hadoop
256333
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,10 @@ object SparkSubmit {
400400
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
401401
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
402402

403+
// Yarn client or cluster
404+
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
405+
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
406+
403407
// Other options
404408
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
405409
sysProp = "spark.executor.cores"),

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
6363
var action: SparkSubmitAction = null
6464
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
6565
var proxyUser: String = null
66+
var principal: String = null
67+
var keytab: String = null
6668

6769
// Standalone cluster mode only
6870
var supervise: Boolean = false
@@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
393395
case PROXY_USER =>
394396
proxyUser = value
395397

398+
case PRINCIPAL =>
399+
principal = value
400+
401+
case KEYTAB =>
402+
keytab = value
403+
396404
case HELP =>
397405
printUsageAndExit(0)
398406

@@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
506514
| --num-executors NUM Number of executors to launch (Default: 2).
507515
| --archives ARCHIVES Comma separated list of archives to be extracted into the
508516
| working directory of each executor.
517+
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
518+
| secure HDFS.
519+
| --keytab KEYTAB The full path to the file that contains the keytab for the
520+
| principal specified above. This keytab will be copied to
521+
| the node running the Application Master via the Secure
522+
| Distributed Cache, for renewing the login tickets and the
523+
| delegation tokens periodically.
509524
""".stripMargin
510525
)
511526
SparkSubmit.exitFn()

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.executor
2020
import java.net.URL
2121
import java.nio.ByteBuffer
2222

23+
import org.apache.hadoop.conf.Configuration
24+
2325
import scala.collection.mutable
2426
import scala.util.{Failure, Success}
2527

@@ -168,6 +170,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
168170
driverConf.set(key, value)
169171
}
170172
}
173+
if (driverConf.contains("spark.yarn.credentials.file")) {
174+
logInfo("Will periodically update credentials from: " +
175+
driverConf.get("spark.yarn.credentials.file"))
176+
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
177+
}
178+
171179
val env = SparkEnv.createExecutorEnv(
172180
driverConf, executorId, hostname, port, cores, isLocal = false)
173181

@@ -183,6 +191,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
183191
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
184192
}
185193
env.rpcEnv.awaitTermination()
194+
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
186195
}
187196
}
188197

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6868

6969
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7070
extends ThreadSafeRpcEndpoint with Logging {
71+
7172
override protected def log = CoarseGrainedSchedulerBackend.this.log
7273

7374
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
112113
// Ignoring the task kill since the executor is not registered.
113114
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
114115
}
116+
115117
}
116118

117119
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
122124
} else {
123125
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
124126
context.reply(RegisteredExecutor)
125-
126127
addressToExecutorId(executorRef.address) = executorId
127128
totalCoreCount.addAndGet(cores)
128129
totalRegisteredExecutors.addAndGet(1)
@@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
243244
properties += ((key, value))
244245
}
245246
}
247+
246248
// TODO (prashant) send conf instead of properties
247249
driverEndpoint = rpcEnv.setupEndpoint(
248250
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))

docs/security.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ SSL must be configured on each node and configured for each component involved i
3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
3434

35+
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
36+
3537
### Standalone mode
3638
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
3739

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ class SparkSubmitOptionParser {
6969
// YARN-only options.
7070
protected final String ARCHIVES = "--archives";
7171
protected final String EXECUTOR_CORES = "--executor-cores";
72-
protected final String QUEUE = "--queue";
72+
protected final String KEYTAB = "--keytab";
7373
protected final String NUM_EXECUTORS = "--num-executors";
74+
protected final String PRINCIPAL = "--principal";
75+
protected final String QUEUE = "--queue";
7476

7577
/**
7678
* This is the canonical list of spark-submit options. Each entry in the array contains the
@@ -96,11 +98,13 @@ class SparkSubmitOptionParser {
9698
{ EXECUTOR_MEMORY },
9799
{ FILES },
98100
{ JARS },
101+
{ KEYTAB },
99102
{ KILL_SUBMISSION },
100103
{ MASTER },
101104
{ NAME },
102105
{ NUM_EXECUTORS },
103106
{ PACKAGES },
107+
{ PRINCIPAL },
104108
{ PROPERTIES_FILE },
105109
{ PROXY_USER },
106110
{ PY_FILES },

0 commit comments

Comments
 (0)