Skip to content

Commit 6c65da6

Browse files
harishreedharantgravescs
authored andcommitted
[SPARK-5342] [YARN] Allow long running Spark apps to run on secure YARN/HDFS
Current Spark apps running on Secure YARN/HDFS would not be able to write data to HDFS after 7 days, since delegation tokens cannot be renewed beyond that. This means Spark Streaming apps will not be able to run on Secure YARN. This commit adds basic functionality to fix this issue. In this patch: - new parameters are added - principal and keytab, which can be used to login to a KDC - the client logs in, and then get tokens to start the AM - the keytab is copied to the staging directory - the AM waits for 60% of the time till expiry of the tokens and then logs in using the keytab - each time after 60% of the time, new tokens are created and sent to the executors Currently, to avoid complicating the architecture, we set the keytab and principal in the SparkHadoopUtil singleton, and schedule a login. Once the login is completed, a callback is scheduled. This is being posted for feedback, so I can gather feedback on the general implementation. There are currently a bunch of things to do: - [x] logging - [x] testing - I plan to manually test this soon. If you have ideas of how to add unit tests, comment. - [x] add code to ensure that if these params are set in non-YARN cluster mode, we complain - [x] documentation - [x] Have the executors request for credentials from the AM, so that retries are possible. Author: Hari Shreedharan <[email protected]> Closes apache#4688 from harishreedharan/kerberos-longrunning and squashes the following commits: 36eb8a9 [Hari Shreedharan] Change the renewal interval config param. Fix a bunch of comments. 611923a [Hari Shreedharan] Make sure the namenodes are listed correctly for creating tokens. 09fe224 [Hari Shreedharan] Use token.renew to get token's renewal interval rather than using hdfs-site.xml 6963bbc [Hari Shreedharan] Schedule renewal in AM before starting user class. Else, a restarted AM cannot access HDFS if the user class tries to. 072659e [Hari Shreedharan] Fix build failure caused by thread factory getting moved to ThreadUtils. f041dd3 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 42eead4 [Hari Shreedharan] Remove RPC part. Refactor and move methods around, use renewal interval rather than max lifetime to create new tokens. ebb36f5 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning bc083e3 [Hari Shreedharan] Overload RegisteredExecutor to send tokens. Minor doc updates. 7b19643 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 8a4f268 [Hari Shreedharan] Added docs in the security guide. Changed some code to ensure that the renewer objects are created only if required. e800c8b [Hari Shreedharan] Restore original RegisteredExecutor message, and send new tokens via NewTokens message. 0e9507e [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 7f1bc58 [Hari Shreedharan] Minor fixes, cleanup. bcd11f9 [Hari Shreedharan] Refactor AM and Executor token update code into separate classes, also send tokens via akka on executor startup. f74303c [Hari Shreedharan] Move the new logic into specialized classes. Add cleanup for old credentials files. 2f9975c [Hari Shreedharan] Ensure new tokens are written out immediately on AM restart. Also, pikc up the latest suffix from HDFS if the AM is restarted. 61b2b27 [Hari Shreedharan] Account for AM restarts by making sure lastSuffix is read from the files on HDFS. 62c45ce [Hari Shreedharan] Relogin from keytab periodically. fa233bd [Hari Shreedharan] Adding logging, fixing minor formatting and ordering issues. 42813b4 [Hari Shreedharan] Remove utils.sh, which was re-added due to merge with master. 0de27ee [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning 55522e3 [Hari Shreedharan] Fix failure caused by Preconditions ambiguity. 9ef5f1b [Hari Shreedharan] Added explanation of how the credentials refresh works, some other minor fixes. f4fd711 [Hari Shreedharan] Fix SparkConf usage. 2debcea [Hari Shreedharan] Change the file structure for credentials files. I will push a followup patch which adds a cleanup mechanism for old credentials files. The credentials files are small and few enough for it to cause issues on HDFS. af6d5f0 [Hari Shreedharan] Cleaning up files where changes weren't required. f0f54cb [Hari Shreedharan] Be more defensive when updating the credentials file. f6954da [Hari Shreedharan] Got rid of Akka communication to renew, instead the executors check a known file's modification time to read the credentials. 5c11c3e [Hari Shreedharan] Move tests to YarnSparkHadoopUtil to fix compile issues. b4cb917 [Hari Shreedharan] Send keytab to AM via DistributedCache rather than directly via HDFS 0985b4e [Hari Shreedharan] Write tokens to HDFS and read them back when required, rather than sending them over the wire. d79b2b9 [Hari Shreedharan] Make sure correct credentials are passed to FileSystem#addDelegationTokens() 8c6928a [Hari Shreedharan] Fix issue caused by direct creation of Actor object. fb27f46 [Hari Shreedharan] Make sure principal and keytab are set before CoarseGrainedSchedulerBackend is started. Also schedule re-logins in CoarseGrainedSchedulerBackend#start() 41efde0 [Hari Shreedharan] Merge branch 'master' into kerberos-longrunning d282d7a [Hari Shreedharan] Fix ClientSuite to set YARN mode, so that the correct class is used in tests. bcfc374 [Hari Shreedharan] Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with impl in YarnSparkHadoopUtil. f8fe694 [Hari Shreedharan] Handle None if keytab-login is not scheduled. 2b0d745 [Hari Shreedharan] [SPARK-5342][YARN] Allow long running Spark apps to run on secure YARN/HDFS. ccba5bc [Hari Shreedharan] WIP: More changes wrt kerberos 77914dd [Hari Shreedharan] WIP: Add kerberos principal and keytab to YARN client.
1 parent 7dacc08 commit 6c65da6

File tree

16 files changed

+635
-111
lines changed

16 files changed

+635
-111
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy
18+
19+
import java.util.concurrent.{Executors, TimeUnit}
20+
21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.hadoop.fs.{FileSystem, Path}
23+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
24+
25+
import org.apache.spark.{Logging, SparkConf}
26+
import org.apache.spark.util.{ThreadUtils, Utils}
27+
28+
import scala.util.control.NonFatal
29+
30+
private[spark] class ExecutorDelegationTokenUpdater(
31+
sparkConf: SparkConf,
32+
hadoopConf: Configuration) extends Logging {
33+
34+
@volatile private var lastCredentialsFileSuffix = 0
35+
36+
private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
37+
38+
private val delegationTokenRenewer =
39+
Executors.newSingleThreadScheduledExecutor(
40+
ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
41+
42+
// On the executor, this thread wakes up and picks up new tokens from HDFS, if any.
43+
private val executorUpdaterRunnable =
44+
new Runnable {
45+
override def run(): Unit = Utils.logUncaughtExceptions(updateCredentialsIfRequired())
46+
}
47+
48+
def updateCredentialsIfRequired(): Unit = {
49+
try {
50+
val credentialsFilePath = new Path(credentialsFile)
51+
val remoteFs = FileSystem.get(hadoopConf)
52+
SparkHadoopUtil.get.listFilesSorted(
53+
remoteFs, credentialsFilePath.getParent,
54+
credentialsFilePath.getName, SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
55+
.lastOption.foreach { credentialsStatus =>
56+
val suffix = SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
57+
if (suffix > lastCredentialsFileSuffix) {
58+
logInfo("Reading new delegation tokens from " + credentialsStatus.getPath)
59+
val newCredentials = getCredentialsFromHDFSFile(remoteFs, credentialsStatus.getPath)
60+
lastCredentialsFileSuffix = suffix
61+
UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
62+
logInfo("Tokens updated from credentials file.")
63+
} else {
64+
// Check every hour to see if new credentials arrived.
65+
logInfo("Updated delegation tokens were expected, but the driver has not updated the " +
66+
"tokens yet, will check again in an hour.")
67+
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
68+
return
69+
}
70+
}
71+
val timeFromNowToRenewal =
72+
SparkHadoopUtil.get.getTimeFromNowToRenewal(
73+
sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
74+
if (timeFromNowToRenewal <= 0) {
75+
executorUpdaterRunnable.run()
76+
} else {
77+
logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal millis.")
78+
delegationTokenRenewer.schedule(
79+
executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
80+
}
81+
} catch {
82+
// Since the file may get deleted while we are reading it, catch the Exception and come
83+
// back in an hour to try again
84+
case NonFatal(e) =>
85+
logWarning("Error while trying to update credentials, will try again in 1 hour", e)
86+
delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, TimeUnit.HOURS)
87+
}
88+
}
89+
90+
private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: Path): Credentials = {
91+
val stream = remoteFs.open(tokenPath)
92+
try {
93+
val newCredentials = new Credentials()
94+
newCredentials.readTokenStorageStream(stream)
95+
newCredentials
96+
} finally {
97+
stream.close()
98+
}
99+
}
100+
101+
def stop(): Unit = {
102+
delegationTokenRenewer.shutdown()
103+
}
104+
105+
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.io.{ByteArrayInputStream, DataInputStream}
2021
import java.lang.reflect.Method
2122
import java.security.PrivilegedExceptionAction
23+
import java.util.{Arrays, Comparator}
2224

25+
import com.google.common.primitives.Longs
2326
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
27+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
2528
import org.apache.hadoop.fs.FileSystem.Statistics
29+
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
2630
import org.apache.hadoop.mapred.JobConf
2731
import org.apache.hadoop.mapreduce.JobContext
2832
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -32,14 +36,16 @@ import org.apache.spark.annotation.DeveloperApi
3236
import org.apache.spark.util.Utils
3337

3438
import scala.collection.JavaConversions._
39+
import scala.concurrent.duration._
3540

3641
/**
3742
* :: DeveloperApi ::
3843
* Contains util methods to interact with Hadoop from Spark.
3944
*/
4045
@DeveloperApi
4146
class SparkHadoopUtil extends Logging {
42-
val conf: Configuration = newConfiguration(new SparkConf())
47+
private val sparkConf = new SparkConf()
48+
val conf: Configuration = newConfiguration(sparkConf)
4349
UserGroupInformation.setConfiguration(conf)
4450

4551
/**
@@ -201,6 +207,61 @@ class SparkHadoopUtil extends Logging {
201207
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
202208
}
203209

210+
/**
211+
* Lists all the files in a directory with the specified prefix, and does not end with the
212+
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
213+
* the respective files.
214+
*/
215+
def listFilesSorted(
216+
remoteFs: FileSystem,
217+
dir: Path,
218+
prefix: String,
219+
exclusionSuffix: String): Array[FileStatus] = {
220+
val fileStatuses = remoteFs.listStatus(dir,
221+
new PathFilter {
222+
override def accept(path: Path): Boolean = {
223+
val name = path.getName
224+
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
225+
}
226+
})
227+
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
228+
override def compare(o1: FileStatus, o2: FileStatus): Int = {
229+
Longs.compare(o1.getModificationTime, o2.getModificationTime)
230+
}
231+
})
232+
fileStatuses
233+
}
234+
235+
/**
236+
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
237+
* is valid the latest)?
238+
* This will return -ve (or 0) value if the fraction of validity has already expired.
239+
*/
240+
def getTimeFromNowToRenewal(
241+
sparkConf: SparkConf,
242+
fraction: Double,
243+
credentials: Credentials): Long = {
244+
val now = System.currentTimeMillis()
245+
246+
val renewalInterval =
247+
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)
248+
249+
credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
250+
.map { t =>
251+
val identifier = new DelegationTokenIdentifier()
252+
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
253+
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
254+
}.foldLeft(0L)(math.max)
255+
}
256+
257+
258+
private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
259+
val fileName = credentialsPath.getName
260+
fileName.substring(
261+
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
262+
}
263+
264+
204265
private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
205266

206267
/**
@@ -251,6 +312,10 @@ object SparkHadoopUtil {
251312
}
252313
}
253314

315+
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
316+
317+
val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
318+
254319
def get: SparkHadoopUtil = {
255320
hadoop
256321
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,10 @@ object SparkSubmit {
401401
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
402402
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
403403

404+
// Yarn client or cluster
405+
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
406+
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),
407+
404408
// Other options
405409
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
406410
sysProp = "spark.executor.cores"),

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
6363
var action: SparkSubmitAction = null
6464
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
6565
var proxyUser: String = null
66+
var principal: String = null
67+
var keytab: String = null
6668

6769
// Standalone cluster mode only
6870
var supervise: Boolean = false
@@ -393,6 +395,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
393395
case PROXY_USER =>
394396
proxyUser = value
395397

398+
case PRINCIPAL =>
399+
principal = value
400+
401+
case KEYTAB =>
402+
keytab = value
403+
396404
case HELP =>
397405
printUsageAndExit(0)
398406

@@ -506,6 +514,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
506514
| --num-executors NUM Number of executors to launch (Default: 2).
507515
| --archives ARCHIVES Comma separated list of archives to be extracted into the
508516
| working directory of each executor.
517+
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
518+
| secure HDFS.
519+
| --keytab KEYTAB The full path to the file that contains the keytab for the
520+
| principal specified above. This keytab will be copied to
521+
| the node running the Application Master via the Secure
522+
| Distributed Cache, for renewing the login tickets and the
523+
| delegation tokens periodically.
509524
""".stripMargin
510525
)
511526
SparkSubmit.exitFn()

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
2626
import org.apache.spark.rpc._
2727
import org.apache.spark._
2828
import org.apache.spark.TaskState.TaskState
29-
import org.apache.spark.deploy.SparkHadoopUtil
29+
import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, SparkHadoopUtil}
3030
import org.apache.spark.deploy.worker.WorkerWatcher
3131
import org.apache.spark.scheduler.TaskDescription
3232
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -168,6 +168,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
168168
driverConf.set(key, value)
169169
}
170170
}
171+
var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
172+
if (driverConf.contains("spark.yarn.credentials.file")) {
173+
logInfo("Will periodically update credentials from: " +
174+
driverConf.get("spark.yarn.credentials.file"))
175+
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
176+
tokenUpdaterOption =
177+
Some(new ExecutorDelegationTokenUpdater(driverConf, SparkHadoopUtil.get.conf))
178+
tokenUpdaterOption.get.updateCredentialsIfRequired()
179+
}
180+
171181
val env = SparkEnv.createExecutorEnv(
172182
driverConf, executorId, hostname, port, cores, isLocal = false)
173183

@@ -183,6 +193,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
183193
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
184194
}
185195
env.rpcEnv.awaitTermination()
196+
tokenUpdaterOption.foreach(_.stop())
186197
}
187198
}
188199

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6868

6969
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7070
extends ThreadSafeRpcEndpoint with Logging {
71+
7172
override protected def log = CoarseGrainedSchedulerBackend.this.log
7273

7374
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
112113
// Ignoring the task kill since the executor is not registered.
113114
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
114115
}
116+
115117
}
116118

117119
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
122124
} else {
123125
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
124126
context.reply(RegisteredExecutor)
125-
126127
addressToExecutorId(executorRef.address) = executorId
127128
totalCoreCount.addAndGet(cores)
128129
totalRegisteredExecutors.addAndGet(1)
@@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
243244
properties += ((key, value))
244245
}
245246
}
247+
246248
// TODO (prashant) send conf instead of properties
247249
driverEndpoint = rpcEnv.setupEndpoint(
248250
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))

docs/security.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ SSL must be configured on each node and configured for each component involved i
3232
### YARN mode
3333
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.
3434

35+
For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.
36+
3537
### Standalone mode
3638
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.
3739

launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,10 @@ class SparkSubmitOptionParser {
6969
// YARN-only options.
7070
protected final String ARCHIVES = "--archives";
7171
protected final String EXECUTOR_CORES = "--executor-cores";
72-
protected final String QUEUE = "--queue";
72+
protected final String KEYTAB = "--keytab";
7373
protected final String NUM_EXECUTORS = "--num-executors";
74+
protected final String PRINCIPAL = "--principal";
75+
protected final String QUEUE = "--queue";
7476

7577
/**
7678
* This is the canonical list of spark-submit options. Each entry in the array contains the
@@ -96,11 +98,13 @@ class SparkSubmitOptionParser {
9698
{ EXECUTOR_MEMORY },
9799
{ FILES },
98100
{ JARS },
101+
{ KEYTAB },
99102
{ KILL_SUBMISSION },
100103
{ MASTER },
101104
{ NAME },
102105
{ NUM_EXECUTORS },
103106
{ PACKAGES },
107+
{ PRINCIPAL },
104108
{ PROPERTIES_FILE },
105109
{ PROXY_USER },
106110
{ PY_FILES },

0 commit comments

Comments
 (0)