Skip to content

Commit e1dd03e

Browse files
author
Marcelo Vanzin
committed
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
The main goal of this change is to allow multiple cluster-mode submissions from the same JVM, without having them end up with mixed configuration. That is done by extending the SparkApplication trait, and doing so was reasonably trivial for standalone and mesos modes. For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE" system property to control behavior indirectly in a whole bunch of places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes. Most of the changes here are removing that. Since we removed support for Hadoop 1.x, some methods that lived in YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining methods don't need to be part of the class, and can be called directly from the YarnSparkHadoopUtil object, so now there's a single implementation of SparkHadoopUtil. There were two places in the code that relied on SPARK_YARN_MODE to make decisions about YARN-specific functionality, and now explicitly check the master from the configuration for that instead: * fetching the external shuffle service port, which can come from the YARN configuration. * propagation of the authentication secret using Hadoop credentials. This also was cleaned up a little to not need so many methods in `SparkHadoopUtil`. With those out of the way, actually changing the YARN client to extend SparkApplication was easy. Tested with existing unit tests, and also by running YARN apps with auth and kerberos both on and off in a real cluster. Author: Marcelo Vanzin <[email protected]> Closes #19631 from vanzin/SPARK-22372.
1 parent f81401e commit e1dd03e

25 files changed

+274
-396
lines changed

core/src/main/scala/org/apache/spark/SecurityManager.scala

Lines changed: 51 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@ package org.apache.spark
1919

2020
import java.lang.{Byte => JByte}
2121
import java.net.{Authenticator, PasswordAuthentication}
22+
import java.nio.charset.StandardCharsets.UTF_8
2223
import java.security.{KeyStore, SecureRandom}
2324
import java.security.cert.X509Certificate
2425
import javax.net.ssl._
2526

2627
import com.google.common.hash.HashCodes
2728
import com.google.common.io.Files
2829
import org.apache.hadoop.io.Text
30+
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
2931

30-
import org.apache.spark.deploy.SparkHadoopUtil
3132
import org.apache.spark.internal.Logging
3233
import org.apache.spark.internal.config._
34+
import org.apache.spark.launcher.SparkLauncher
3335
import org.apache.spark.network.sasl.SecretKeyHolder
3436
import org.apache.spark.util.Utils
3537

@@ -225,7 +227,6 @@ private[spark] class SecurityManager(
225227
setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
226228
setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", ""));
227229

228-
private val secretKey = generateSecretKey()
229230
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
230231
"; ui acls " + (if (aclsOn) "enabled" else "disabled") +
231232
"; users with view permissions: " + viewAcls.toString() +
@@ -416,50 +417,6 @@ private[spark] class SecurityManager(
416417

417418
def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey
418419

419-
/**
420-
* Generates or looks up the secret key.
421-
*
422-
* The way the key is stored depends on the Spark deployment mode. Yarn
423-
* uses the Hadoop UGI.
424-
*
425-
* For non-Yarn deployments, If the config variable is not set
426-
* we throw an exception.
427-
*/
428-
private def generateSecretKey(): String = {
429-
if (!isAuthenticationEnabled) {
430-
null
431-
} else if (SparkHadoopUtil.get.isYarnMode) {
432-
// In YARN mode, the secure cookie will be created by the driver and stashed in the
433-
// user's credentials, where executors can get it. The check for an array of size 0
434-
// is because of the test code in YarnSparkHadoopUtilSuite.
435-
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)
436-
if (secretKey == null || secretKey.length == 0) {
437-
logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
438-
val rnd = new SecureRandom()
439-
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
440-
val secret = new Array[Byte](length)
441-
rnd.nextBytes(secret)
442-
443-
val cookie = HashCodes.fromBytes(secret).toString()
444-
SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
445-
cookie
446-
} else {
447-
new Text(secretKey).toString
448-
}
449-
} else {
450-
// user must have set spark.authenticate.secret config
451-
// For Master/Worker, auth secret is in conf; for Executors, it is in env variable
452-
Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
453-
.orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
454-
case Some(value) => value
455-
case None =>
456-
throw new IllegalArgumentException(
457-
"Error: a secret key must be specified via the " +
458-
SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
459-
}
460-
}
461-
}
462-
463420
/**
464421
* Check to see if Acls for the UI are enabled
465422
* @return true if UI authentication is enabled, otherwise false
@@ -542,7 +499,51 @@ private[spark] class SecurityManager(
542499
* Gets the secret key.
543500
* @return the secret key as a String if authentication is enabled, otherwise returns null
544501
*/
545-
def getSecretKey(): String = secretKey
502+
def getSecretKey(): String = {
503+
if (isAuthenticationEnabled) {
504+
val creds = UserGroupInformation.getCurrentUser().getCredentials()
505+
Option(creds.getSecretKey(SECRET_LOOKUP_KEY))
506+
.map { bytes => new String(bytes, UTF_8) }
507+
.orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET)))
508+
.orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF))
509+
.getOrElse {
510+
throw new IllegalArgumentException(
511+
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config")
512+
}
513+
} else {
514+
null
515+
}
516+
}
517+
518+
/**
519+
* Initialize the authentication secret.
520+
*
521+
* If authentication is disabled, do nothing.
522+
*
523+
* In YARN mode, generate a new secret and store it in the current user's credentials.
524+
*
525+
* In other modes, assert that the auth secret is set in the configuration.
526+
*/
527+
def initializeAuth(): Unit = {
528+
if (!sparkConf.get(NETWORK_AUTH_ENABLED)) {
529+
return
530+
}
531+
532+
if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") {
533+
require(sparkConf.contains(SPARK_AUTH_SECRET_CONF),
534+
s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.")
535+
return
536+
}
537+
538+
val rnd = new SecureRandom()
539+
val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE
540+
val secretBytes = new Array[Byte](length)
541+
rnd.nextBytes(secretBytes)
542+
543+
val creds = new Credentials()
544+
creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes)
545+
UserGroupInformation.getCurrentUser().addCredentials(creds)
546+
}
546547

547548
// Default SecurityManager only has a single secret key, so ignore appId.
548549
override def getSaslUser(appId: String): String = getSaslUser()
@@ -551,13 +552,12 @@ private[spark] class SecurityManager(
551552

552553
private[spark] object SecurityManager {
553554

554-
val SPARK_AUTH_CONF: String = "spark.authenticate"
555-
val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
555+
val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
556+
val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
556557
// This is used to set auth secret to an executor's env variable. It should have the same
557558
// value as SPARK_AUTH_SECRET_CONF set in SparkConf
558559
val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET"
559560

560561
// key used to store the spark secret in the Hadoop UGI
561-
val SECRET_LOOKUP_KEY = "sparkCookie"
562-
562+
val SECRET_LOOKUP_KEY = new Text("sparkCookie")
563563
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,6 @@ class SparkContext(config: SparkConf) extends Logging {
413413
}
414414
}
415415

416-
if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
417-
418416
_listenerBus = new LiveListenerBus(_conf)
419417

420418
// Initialize the app status store and listener before SparkEnv is created so that it gets
@@ -1955,7 +1953,6 @@ class SparkContext(config: SparkConf) extends Logging {
19551953
// `SparkContext` is stopped.
19561954
localProperties.remove()
19571955
// Unset YARN mode system env variable, to allow switching between cluster types.
1958-
System.clearProperty("SPARK_YARN_MODE")
19591956
SparkContext.clearActiveContext()
19601957
logInfo("Successfully stopped SparkContext")
19611958
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ object SparkEnv extends Logging {
234234
}
235235

236236
val securityManager = new SecurityManager(conf, ioEncryptionKey)
237+
if (isDriver) {
238+
securityManager.initializeAuth()
239+
}
240+
237241
ioEncryptionKey.foreach { _ =>
238242
if (!securityManager.isEncryptionEnabled()) {
239243
logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,13 @@ object Client {
217217
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
218218
}
219219
// scalastyle:on println
220+
new ClientApp().start(args, new SparkConf())
221+
}
222+
}
220223

221-
val conf = new SparkConf()
224+
private[spark] class ClientApp extends SparkApplication {
225+
226+
override def start(args: Array[String], conf: SparkConf): Unit = {
222227
val driverArgs = new ClientArguments(args)
223228

224229
if (!conf.contains("spark.rpc.askTimeout")) {
@@ -235,4 +240,5 @@ object Client {
235240

236241
rpcEnv.awaitTermination()
237242
}
243+
238244
}

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 6 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ class SparkHadoopUtil extends Logging {
7575
}
7676

7777
def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
78-
for (token <- source.getTokens.asScala) {
79-
dest.addToken(token)
80-
}
78+
dest.addCredentials(source.getCredentials())
8179
}
8280

8381
/**
@@ -120,16 +118,9 @@ class SparkHadoopUtil extends Logging {
120118
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
121119
* cluster.
122120
*/
123-
def addCredentials(conf: JobConf) {}
124-
125-
def isYarnMode(): Boolean = { false }
126-
127-
def addSecretKeyToUserCredentials(key: String, secret: String) {}
128-
129-
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
130-
131-
def getCurrentUserCredentials(): Credentials = {
132-
UserGroupInformation.getCurrentUser().getCredentials()
121+
def addCredentials(conf: JobConf): Unit = {
122+
val jobCreds = conf.getCredentials()
123+
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
133124
}
134125

135126
def addCurrentUserCredentials(creds: Credentials): Unit = {
@@ -328,17 +319,6 @@ class SparkHadoopUtil extends Logging {
328319
}
329320
}
330321

331-
/**
332-
* Start a thread to periodically update the current user's credentials with new credentials so
333-
* that access to secured service does not fail.
334-
*/
335-
private[spark] def startCredentialUpdater(conf: SparkConf) {}
336-
337-
/**
338-
* Stop the thread that does the credential updates.
339-
*/
340-
private[spark] def stopCredentialUpdater() {}
341-
342322
/**
343323
* Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism.
344324
* This is to prevent the DFSClient from using an old cached token to connect to the NameNode.
@@ -441,14 +421,7 @@ class SparkHadoopUtil extends Logging {
441421

442422
object SparkHadoopUtil {
443423

444-
private lazy val hadoop = new SparkHadoopUtil
445-
private lazy val yarn = try {
446-
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
447-
.newInstance()
448-
.asInstanceOf[SparkHadoopUtil]
449-
} catch {
450-
case e: Exception => throw new SparkException("Unable to load YARN support", e)
451-
}
424+
private lazy val instance = new SparkHadoopUtil
452425

453426
val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
454427

@@ -462,16 +435,7 @@ object SparkHadoopUtil {
462435
*/
463436
private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000
464437

465-
def get: SparkHadoopUtil = {
466-
// Check each time to support changing to/from YARN
467-
val yarnMode = java.lang.Boolean.parseBoolean(
468-
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
469-
if (yarnMode) {
470-
yarn
471-
} else {
472-
hadoop
473-
}
474-
}
438+
def get: SparkHadoopUtil = instance
475439

476440
/**
477441
* Given an expiration date (e.g. for Hadoop Delegation Tokens) return a the date

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@ object SparkSubmit extends CommandLineUtils with Logging {
9292

9393
private val CLASS_NOT_FOUND_EXIT_STATUS = 101
9494

95+
// Following constants are visible for testing.
96+
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
97+
"org.apache.spark.deploy.yarn.YarnClusterApplication"
98+
private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
99+
private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
100+
95101
// scalastyle:off println
96102
private[spark] def printVersionAndExit(): Unit = {
97103
printStream.println("""Welcome to
@@ -281,7 +287,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
281287
}
282288

283289
// Make sure YARN is included in our build if we're trying to use it
284-
if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) {
290+
if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
285291
printErrorAndExit(
286292
"Could not load YARN classes. " +
287293
"This copy of Spark may not have been compiled with YARN support.")
@@ -363,24 +369,21 @@ object SparkSubmit extends CommandLineUtils with Logging {
363369
args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
364370
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
365371

366-
// This security manager will not need an auth secret, but set a dummy value in case
367-
// spark.authenticate is enabled, otherwise an exception is thrown.
368-
lazy val downloadConf = sparkConf.clone().set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
369-
lazy val secMgr = new SecurityManager(downloadConf)
372+
lazy val secMgr = new SecurityManager(sparkConf)
370373

371374
// In client mode, download remote files.
372375
var localPrimaryResource: String = null
373376
var localJars: String = null
374377
var localPyFiles: String = null
375378
if (deployMode == CLIENT) {
376379
localPrimaryResource = Option(args.primaryResource).map {
377-
downloadFile(_, targetDir, downloadConf, hadoopConf, secMgr)
380+
downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
378381
}.orNull
379382
localJars = Option(args.jars).map {
380-
downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr)
383+
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
381384
}.orNull
382385
localPyFiles = Option(args.pyFiles).map {
383-
downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr)
386+
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
384387
}.orNull
385388
}
386389

@@ -391,8 +394,6 @@ object SparkSubmit extends CommandLineUtils with Logging {
391394
// For yarn client mode, since we already download them with above code, so we only need to
392395
// figure out the local path and replace the remote one.
393396
if (clusterManager == YARN) {
394-
sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused")
395-
val secMgr = new SecurityManager(sparkConf)
396397
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
397398

398399
def shouldDownload(scheme: String): Boolean = {
@@ -409,7 +410,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
409410
if (file.exists()) {
410411
file.toURI.toString
411412
} else {
412-
downloadFile(resource, targetDir, downloadConf, hadoopConf, secMgr)
413+
downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
413414
}
414415
case _ => uri.toString
415416
}
@@ -634,11 +635,11 @@ object SparkSubmit extends CommandLineUtils with Logging {
634635
// All Spark parameters are expected to be passed to the client through system properties.
635636
if (args.isStandaloneCluster) {
636637
if (args.useRest) {
637-
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
638+
childMainClass = REST_CLUSTER_SUBMIT_CLASS
638639
childArgs += (args.primaryResource, args.mainClass)
639640
} else {
640641
// In legacy standalone cluster mode, use Client as a wrapper around the user class
641-
childMainClass = "org.apache.spark.deploy.Client"
642+
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
642643
if (args.supervise) { childArgs += "--supervise" }
643644
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
644645
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
@@ -663,7 +664,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
663664

664665
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
665666
if (isYarnCluster) {
666-
childMainClass = "org.apache.spark.deploy.yarn.Client"
667+
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
667668
if (args.isPython) {
668669
childArgs += ("--primary-py-file", args.primaryResource)
669670
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
@@ -684,7 +685,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
684685

685686
if (isMesosCluster) {
686687
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
687-
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
688+
childMainClass = REST_CLUSTER_SUBMIT_CLASS
688689
if (args.isPython) {
689690
// Second argument is main class
690691
childArgs += (args.primaryResource, "")

0 commit comments

Comments
 (0)