From c80554bed52f96ada2e1c4b6a16f5f3e5c7e5317 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 30 Oct 2017 14:06:41 -0700 Subject: [PATCH 01/10] [SPARK-22372][core, yarn] Make cluster submission use SparkApplication. The main goal of this change is to allow multiple cluster-mode submissions from the same JVM, without having them end up with mixed configuration. That is done by extending the SparkApplication trait, and doing so was reasonably trivial for standalone and mesos modes. For YARN mode, there was a complication. YARN used a "SPARK_YARN_MODE" system property to control behavior indirectly in a whole bunch of places, mainly in the SparkHadoopUtil / YarnSparkHadoopUtil classes. Most of the changes here are removing that. Since we removed support for Hadoop 1.x, some methods that lived in YarnSparkHadoopUtil can now live in SparkHadoopUtil. The remaining methods don't need to be part of the class, and can be called directly from the YarnSparkHadoopUtil object, so now there's a single implementation of SparkHadoopUtil. One remaining use case was fetching the external shuffle service port, which can come from the YARN configuration. That is now done by checking the master used to submit the app, instead of the system property. The other use case was the propagation of the auth secret. That was done by stashing the secret in the current user's credentials in YARN mode. Instead, the secret is now propagated using the config / env variable like for standalone and mesos. That allowed a few methods in SparkHadoopUtil to go away. There's still a little bit of code in SecurityManager that is currently YARN-specific, but that uses the conf's master to detect whether the app is a YARN app. This also has the benefit of not stashing the secret in a shared location (the current UGI), making sure different apps use different secrets. With those out of the way, actually changing the YARN client to extend SparkApplication was easy. Tested with existing unit tests, and also by running YARN apps with auth and kerberos both on and off in a real cluster. --- .../org/apache/spark/SecurityManager.scala | 105 +++++++------- .../scala/org/apache/spark/SparkContext.scala | 3 - .../scala/org/apache/spark/SparkEnv.scala | 4 + .../org/apache/spark/deploy/Client.scala | 8 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 48 +------ .../org/apache/spark/deploy/SparkSubmit.scala | 28 ++-- .../deploy/rest/RestSubmissionClient.scala | 31 ++-- .../CoarseGrainedExecutorBackend.scala | 10 +- .../scala/org/apache/spark/util/Utils.scala | 6 +- .../apache/spark/SecurityManagerSuite.scala | 21 ++- .../spark/deploy/SparkSubmitSuite.scala | 8 +- .../rest/StandaloneRestSubmitSuite.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 59 ++++---- .../org/apache/spark/deploy/yarn/Client.scala | 71 ++++----- .../spark/deploy/yarn/ExecutorRunnable.scala | 4 + .../spark/deploy/yarn/YarnRMClient.scala | 2 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 135 ++++++------------ .../yarn/security/AMCredentialRenewer.scala | 2 +- .../cluster/YarnClientSchedulerBackend.scala | 4 +- .../cluster/YarnClusterSchedulerBackend.scala | 2 +- .../spark/deploy/yarn/ClientSuite.scala | 8 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 20 ++- .../yarn/YarnSparkHadoopUtilSuite.scala | 49 +------ ...ARNHadoopDelegationTokenManagerSuite.scala | 2 +- 24 files changed, 271 insertions(+), 361 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 2480e56b72ccf..5fe7e54560739 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -25,11 +25,10 @@ import javax.net.ssl._ import com.google.common.hash.HashCodes import com.google.common.io.Files -import org.apache.hadoop.io.Text -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.sasl.SecretKeyHolder import org.apache.spark.util.Utils @@ -225,7 +224,6 @@ private[spark] class SecurityManager( setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", "")); setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", "")); - private val secretKey = generateSecretKey() logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") + "; ui acls " + (if (aclsOn) "enabled" else "disabled") + "; users with view permissions: " + viewAcls.toString() + @@ -416,50 +414,6 @@ private[spark] class SecurityManager( def getIOEncryptionKey(): Option[Array[Byte]] = ioEncryptionKey - /** - * Generates or looks up the secret key. - * - * The way the key is stored depends on the Spark deployment mode. Yarn - * uses the Hadoop UGI. - * - * For non-Yarn deployments, If the config variable is not set - * we throw an exception. - */ - private def generateSecretKey(): String = { - if (!isAuthenticationEnabled) { - null - } else if (SparkHadoopUtil.get.isYarnMode) { - // In YARN mode, the secure cookie will be created by the driver and stashed in the - // user's credentials, where executors can get it. The check for an array of size 0 - // is because of the test code in YarnSparkHadoopUtilSuite. - val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY) - if (secretKey == null || secretKey.length == 0) { - logDebug("generateSecretKey: yarn mode, secret key from credentials is null") - val rnd = new SecureRandom() - val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE - val secret = new Array[Byte](length) - rnd.nextBytes(secret) - - val cookie = HashCodes.fromBytes(secret).toString() - SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie) - cookie - } else { - new Text(secretKey).toString - } - } else { - // user must have set spark.authenticate.secret config - // For Master/Worker, auth secret is in conf; for Executors, it is in env variable - Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET)) - .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match { - case Some(value) => value - case None => - throw new IllegalArgumentException( - "Error: a secret key must be specified via the " + - SecurityManager.SPARK_AUTH_SECRET_CONF + " config") - } - } - } - /** * Check to see if Acls for the UI are enabled * @return true if UI authentication is enabled, otherwise false @@ -542,7 +496,55 @@ private[spark] class SecurityManager( * Gets the secret key. * @return the secret key as a String if authentication is enabled, otherwise returns null */ - def getSecretKey(): String = secretKey + def getSecretKey(): String = { + if (isAuthenticationEnabled) { + Option(sparkConf.getenv(ENV_AUTH_SECRET)) + .orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF)) + .getOrElse { + throw new IllegalArgumentException( + s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config") + } + } else { + null + } + } + + /** + * Initialize the configuration object held by this class for authentication. + * + * If authentication is disabled, do nothing. + * + * In YARN mode, generate a secret key and store it in the configuration object, setting it up to + * also be propagated to executors using an env variable. + * + * In other modes, assert that the auth secret is set in the configuration. + */ + def initializeAuth(): Unit = { + if (!sparkConf.get(NETWORK_AUTH_ENABLED)) { + return + } + + if (sparkConf.get(SparkLauncher.SPARK_MASTER, null) != "yarn") { + require(sparkConf.contains(SPARK_AUTH_SECRET_CONF), + s"A secret key must be specified via the $SPARK_AUTH_SECRET_CONF config.") + return + } + + // In YARN, force creation of a new secret if this is client mode. This ensures each + // YARN app uses a different secret. For cluster mode, this relies on YARN's client to + // not propagate the secret to the driver, which will then generate a new one. + val deployMode = sparkConf.get(SparkLauncher.DEPLOY_MODE, "client") + if (!sparkConf.contains(SPARK_AUTH_SECRET_CONF) || deployMode == "client") { + val rnd = new SecureRandom() + val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE + val secretBytes = new Array[Byte](length) + rnd.nextBytes(secretBytes) + + val secret = HashCodes.fromBytes(secretBytes).toString() + sparkConf.set(SPARK_AUTH_SECRET_CONF, secret) + sparkConf.setExecutorEnv(ENV_AUTH_SECRET, secret) + } + } // Default SecurityManager only has a single secret key, so ignore appId. override def getSaslUser(appId: String): String = getSaslUser() @@ -551,13 +553,10 @@ private[spark] class SecurityManager( private[spark] object SecurityManager { - val SPARK_AUTH_CONF: String = "spark.authenticate" - val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret" + val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key + val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret" // This is used to set auth secret to an executor's env variable. It should have the same // value as SPARK_AUTH_SECRET_CONF set in SparkConf val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET" - // key used to store the spark secret in the Hadoop UGI - val SECRET_LOOKUP_KEY = "sparkCookie" - } diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6f25d346e6e54..24b66d6bf9c46 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -412,8 +412,6 @@ class SparkContext(config: SparkConf) extends Logging { } } - if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") - _listenerBus = new LiveListenerBus(_conf) // "_jobProgressListener" should be set up before creating SparkEnv because when creating @@ -1943,7 +1941,6 @@ class SparkContext(config: SparkConf) extends Logging { // `SparkContext` is stopped. localProperties.remove() // Unset YARN mode system env variable, to allow switching between cluster types. - System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() logInfo("Successfully stopped SparkContext") } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 24928150315e8..72123f2232532 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -234,6 +234,10 @@ object SparkEnv extends Logging { } val securityManager = new SecurityManager(conf, ioEncryptionKey) + if (isDriver) { + securityManager.initializeAuth() + } + ioEncryptionKey.foreach { _ => if (!securityManager.isEncryptionEnabled()) { logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " + diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7acb5c55bb252..d5145094ec079 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -217,8 +217,13 @@ object Client { println("Use ./bin/spark-submit with \"--master spark://host:port\"") } // scalastyle:on println + new ClientApp().start(args, new SparkConf()) + } +} - val conf = new SparkConf() +private[spark] class ClientApp extends SparkApplication { + + override def start(args: Array[String], conf: SparkConf): Unit = { val driverArgs = new ClientArguments(args) if (!conf.contains("spark.rpc.askTimeout")) { @@ -235,4 +240,5 @@ object Client { rpcEnv.awaitTermination() } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 1fa10ab943f34..640c673e7fafe 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -75,9 +75,7 @@ class SparkHadoopUtil extends Logging { } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { - for (token <- source.getTokens.asScala) { - dest.addToken(token) - } + dest.addCredentials(source.getCredentials()) } /** @@ -120,16 +118,9 @@ class SparkHadoopUtil extends Logging { * Add any user credentials to the job conf which are necessary for running on a secure Hadoop * cluster. */ - def addCredentials(conf: JobConf) {} - - def isYarnMode(): Boolean = { false } - - def addSecretKeyToUserCredentials(key: String, secret: String) {} - - def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null } - - def getCurrentUserCredentials(): Credentials = { - UserGroupInformation.getCurrentUser().getCredentials() + def addCredentials(conf: JobConf): Unit = { + val jobCreds = conf.getCredentials() + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } def addCurrentUserCredentials(creds: Credentials): Unit = { @@ -317,17 +308,6 @@ class SparkHadoopUtil extends Logging { } } - /** - * Start a thread to periodically update the current user's credentials with new credentials so - * that access to secured service does not fail. - */ - private[spark] def startCredentialUpdater(conf: SparkConf) {} - - /** - * Stop the thread that does the credential updates. - */ - private[spark] def stopCredentialUpdater() {} - /** * Return a fresh Hadoop configuration, bypassing the HDFS cache mechanism. * This is to prevent the DFSClient from using an old cached token to connect to the NameNode. @@ -430,14 +410,7 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { - private lazy val hadoop = new SparkHadoopUtil - private lazy val yarn = try { - Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") - .newInstance() - .asInstanceOf[SparkHadoopUtil] - } catch { - case e: Exception => throw new SparkException("Unable to load YARN support", e) - } + private lazy val instance = new SparkHadoopUtil val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp" @@ -451,16 +424,7 @@ object SparkHadoopUtil { */ private[spark] val UPDATE_INPUT_METRICS_INTERVAL_RECORDS = 1000 - def get: SparkHadoopUtil = { - // Check each time to support changing to/from YARN - val yarnMode = java.lang.Boolean.parseBoolean( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if (yarnMode) { - yarn - } else { - hadoop - } - } + def get: SparkHadoopUtil = instance /** * Returns a Configuration object with Spark configuration applied on top. Unlike diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 73b956ef3e470..5a08084d270f9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -92,6 +92,11 @@ object SparkSubmit extends CommandLineUtils with Logging { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 + // Following constants are visible for testing. + private[deploy] val YARN_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" + private[deploy] val REST_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() + private[deploy] val STANDALONE_SUBMIT_CLASS = classOf[ClientApp].getName() + // scalastyle:off println private[spark] def printVersionAndExit(): Unit = { printStream.println("""Welcome to @@ -281,7 +286,7 @@ object SparkSubmit extends CommandLineUtils with Logging { } // Make sure YARN is included in our build if we're trying to use it - if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { + if (!Utils.classIsLoadable(YARN_SUBMIT_CLASS) && !Utils.isTesting) { printErrorAndExit( "Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.") @@ -365,8 +370,7 @@ object SparkSubmit extends CommandLineUtils with Logging { // This security manager will not need an auth secret, but set a dummy value in case // spark.authenticate is enabled, otherwise an exception is thrown. - lazy val downloadConf = sparkConf.clone().set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") - lazy val secMgr = new SecurityManager(downloadConf) + lazy val secMgr = new SecurityManager(sparkConf) // In client mode, download remote files. var localPrimaryResource: String = null @@ -374,13 +378,13 @@ object SparkSubmit extends CommandLineUtils with Logging { var localPyFiles: String = null if (deployMode == CLIENT) { localPrimaryResource = Option(args.primaryResource).map { - downloadFile(_, targetDir, downloadConf, hadoopConf, secMgr) + downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull localJars = Option(args.jars).map { - downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull localPyFiles = Option(args.pyFiles).map { - downloadFileList(_, targetDir, downloadConf, hadoopConf, secMgr) + downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull } @@ -391,8 +395,6 @@ object SparkSubmit extends CommandLineUtils with Logging { // For yarn client mode, since we already download them with above code, so we only need to // figure out the local path and replace the remote one. if (clusterManager == YARN) { - sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") - val secMgr = new SecurityManager(sparkConf) val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) def shouldDownload(scheme: String): Boolean = { @@ -409,7 +411,7 @@ object SparkSubmit extends CommandLineUtils with Logging { if (file.exists()) { file.toURI.toString } else { - downloadFile(resource, targetDir, downloadConf, hadoopConf, secMgr) + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) } case _ => uri.toString } @@ -634,11 +636,11 @@ object SparkSubmit extends CommandLineUtils with Logging { // All Spark parameters are expected to be passed to the client through system properties. if (args.isStandaloneCluster) { if (args.useRest) { - childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" + childMainClass = REST_SUBMIT_CLASS childArgs += (args.primaryResource, args.mainClass) } else { // In legacy standalone cluster mode, use Client as a wrapper around the user class - childMainClass = "org.apache.spark.deploy.Client" + childMainClass = STANDALONE_SUBMIT_CLASS if (args.supervise) { childArgs += "--supervise" } Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } @@ -663,7 +665,7 @@ object SparkSubmit extends CommandLineUtils with Logging { // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { - childMainClass = "org.apache.spark.deploy.yarn.Client" + childMainClass = YARN_SUBMIT_CLASS if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") @@ -684,7 +686,7 @@ object SparkSubmit extends CommandLineUtils with Logging { if (isMesosCluster) { assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API") - childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient" + childMainClass = REST_SUBMIT_CLASS if (args.isPython) { // Second argument is main class childArgs += (args.primaryResource, "") diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 21cb94142b15b..742a95841a138 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -32,6 +32,7 @@ import scala.util.control.NonFatal import com.fasterxml.jackson.core.JsonProcessingException import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException} +import org.apache.spark.deploy.SparkApplication import org.apache.spark.internal.Logging import org.apache.spark.util.Utils @@ -398,9 +399,20 @@ private[spark] object RestSubmissionClient { val PROTOCOL_VERSION = "v1" /** - * Submit an application, assuming Spark parameters are specified through the given config. - * This is abstracted to its own method for testing purposes. + * Filter non-spark environment variables from any environment. */ + private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { + env.filterKeys { k => + // SPARK_HOME is filtered out because it is usually wrong on the remote machine (SPARK-12345) + (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") || + k.startsWith("MESOS_") + } + } +} + +private[spark] class RestSubmissionClientApp extends SparkApplication { + + /** Submits a request to run the application and return the response. Visible for testing. */ def run( appResource: String, mainClass: String, @@ -417,7 +429,7 @@ private[spark] object RestSubmissionClient { client.createSubmission(submitRequest) } - def main(args: Array[String]): Unit = { + override def start(args: Array[String], conf: SparkConf): Unit = { if (args.length < 2) { sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]") sys.exit(1) @@ -425,19 +437,8 @@ private[spark] object RestSubmissionClient { val appResource = args(0) val mainClass = args(1) val appArgs = args.slice(2, args.length) - val conf = new SparkConf - val env = filterSystemEnvironment(sys.env) + val env = RestSubmissionClient.filterSystemEnvironment(sys.env) run(appResource, mainClass, appArgs, conf, env) } - /** - * Filter non-spark environment variables from any environment. - */ - private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { - env.filterKeys { k => - // SPARK_HOME is filtered out because it is usually wrong on the remote machine (SPARK-12345) - (k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED" && k != "SPARK_HOME") || - k.startsWith("MESOS_") - } - } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d27362ae85bea..9cdc3bb669094 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) - SparkHadoopUtil.get.startCredentialUpdater(driverConf) + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") + .getMethod("startCredentialUpdater", classOf[SparkConf]) + .invoke(null, driverConf) } cfg.hadoopDelegationCreds.foreach { hadoopCreds => @@ -233,7 +235,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() - SparkHadoopUtil.get.stopCredentialUpdater() + if (driverConf.contains("spark.yarn.credentials.file")) { + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") + .getMethod("stopCredentialUpdater") + .invoke(null) + } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 930e09d90c2f5..51bf91614c866 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -50,6 +50,7 @@ import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.hadoop.security.UserGroupInformation +import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ @@ -59,6 +60,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} @@ -2405,8 +2407,8 @@ private[spark] object Utils extends Logging { */ def getSparkOrYarnConfig(conf: SparkConf, key: String, default: String): String = { val sparkValue = conf.get(key, default) - if (SparkHadoopUtil.get.isYarnMode) { - SparkHadoopUtil.get.newConfiguration(conf).get(key, sparkValue) + if (conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") { + new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(conf)).get(key, sparkValue) } else { sparkValue } diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index 9801b2638cc15..c37925046b990 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark import java.io.File +import org.apache.spark.internal.config._ +import org.apache.spark.launcher.SparkLauncher import org.apache.spark.security.GroupMappingServiceProvider import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} @@ -411,8 +413,12 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { test("missing secret authentication key") { val conf = new SparkConf().set("spark.authenticate", "true") + val mgr = new SecurityManager(conf) intercept[IllegalArgumentException] { - new SecurityManager(conf) + mgr.getSecretKey() + } + intercept[IllegalArgumentException] { + mgr.initializeAuth() } } @@ -430,5 +436,18 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { assert(keyFromEnv === new SecurityManager(conf2).getSecretKey()) } + test("secret key generation in yarn mode") { + val conf = new SparkConf() + .set(NETWORK_AUTH_ENABLED, true) + .set(SparkLauncher.SPARK_MASTER, "yarn") + val mgr = new SecurityManager(conf) + mgr.initializeAuth() + assert(mgr.getSecretKey() != null) + + val confVal = conf.get(SecurityManager.SPARK_AUTH_SECRET_CONF) + val envVal = conf.get("spark.executorEnv." + SecurityManager.ENV_AUTH_SECRET) + assert(confVal === envVal) + } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index cfbf56fb8c369..1c813532f3582 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -232,7 +232,7 @@ class SparkSubmitSuite childArgsStr should include ("--class org.SomeClass") childArgsStr should include ("--arg arg1 --arg arg2") childArgsStr should include regex ("--jar .*thejar.jar") - mainClass should be ("org.apache.spark.deploy.yarn.Client") + mainClass should be (SparkSubmit.YARN_SUBMIT_CLASS) // In yarn cluster mode, also adding jars to classpath classpath(0) should endWith ("thejar.jar") @@ -320,11 +320,11 @@ class SparkSubmitSuite val childArgsStr = childArgs.mkString(" ") if (useRest) { childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2") - mainClass should be ("org.apache.spark.deploy.rest.RestSubmissionClient") + mainClass should be (SparkSubmit.REST_SUBMIT_CLASS) } else { childArgsStr should startWith ("--supervise --memory 4g --cores 5") childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2" - mainClass should be ("org.apache.spark.deploy.Client") + mainClass should be (SparkSubmit.STANDALONE_SUBMIT_CLASS) } classpath should have size 0 sys.props("SPARK_SUBMIT") should be ("true") @@ -399,7 +399,7 @@ class SparkSubmitSuite conf.get("spark.executor.memory") should be ("5g") conf.get("spark.master") should be ("yarn") conf.get("spark.submit.deployMode") should be ("cluster") - mainClass should be ("org.apache.spark.deploy.yarn.Client") + mainClass should be (SparkSubmit.YARN_SUBMIT_CLASS) } test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") { diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala index 490baf040491f..e505bc018857d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/StandaloneRestSubmitSuite.scala @@ -92,7 +92,7 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach { conf.set("spark.app.name", "dreamer") val appArgs = Array("one", "two", "six") // main method calls this - val response = RestSubmissionClient.run("app-resource", "main-class", appArgs, conf) + val response = new RestSubmissionClientApp().run("app-resource", "main-class", appArgs, conf) val submitResponse = getSubmitResponse(response) assert(submitResponse.action === Utils.getFormattedClassName(submitResponse)) assert(submitResponse.serverSparkVersion === SPARK_VERSION) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 244d912b9f3aa..f8cc8a10d9c62 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -56,11 +56,33 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val sparkConf = new SparkConf() - private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) - .asInstanceOf[YarnConfiguration] private val isClusterMode = args.userClass != null + private val sparkConf = new SparkConf() + if (args.propertiesFile != null) { + Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => + sparkConf.set(k, v) + } + } + + // Initialize the security manager for authentication, if enabled. This needs to be done + // before the config is propagated to the system properties. + private val securityMgr = new SecurityManager(sparkConf) + if (isClusterMode) { + securityMgr.initializeAuth() + } + + // Set system properties for each config entry. This covers two use cases: + // - The default configuration stored by the SparkHadoopUtil class + // - The user application creating a new SparkConf in cluster mode + // + // Both cases create a new SparkConf object which reads these configs from system properties. + sparkConf.getAll.foreach { case (k, v) => + sys.props(k) = v + } + + private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) + private val ugi = { val original = UserGroupInformation.getCurrentUser() @@ -311,7 +333,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, yarnConf, - conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) + conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf)) val credentialRenewer = new AMCredentialRenewer(sparkConf, yarnConf, credentialManager) @@ -323,13 +345,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends credentialRenewerThread.join() } - // Call this to force generation of secret so it gets populated into the Hadoop UGI. - val securityMgr = new SecurityManager(sparkConf) - if (isClusterMode) { - runDriver(securityMgr) + runDriver() } else { - runExecutorLauncher(securityMgr) + runExecutorLauncher() } } catch { case e: Exception => @@ -410,8 +429,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends _sparkConf: SparkConf, _rpcEnv: RpcEnv, driverRef: RpcEndpointRef, - uiAddress: Option[String], - securityMgr: SecurityManager) = { + uiAddress: Option[String]) = { val appId = client.getAttemptId().getApplicationId().toString() val attemptId = client.getAttemptId().getAttemptId().toString() val historyAddress = @@ -463,7 +481,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends YarnSchedulerBackend.ENDPOINT_NAME) } - private def runDriver(securityMgr: SecurityManager): Unit = { + private def runDriver(): Unit = { addAmIpFilter(None) userClassThread = startUserApplication() @@ -479,7 +497,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val driverRef = createSchedulerRef( sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.port")) - registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) + registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl)) registered = true } else { // Sanity check; should never happen in normal operation, since sc should only be null @@ -498,15 +516,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { + private def runExecutorLauncher(): Unit = { val hostname = Utils.localHostName val amCores = sparkConf.get(AM_CORES) rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, amCores, true) val driverRef = waitForSparkDriver() addAmIpFilter(Some(driverRef)) - registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress"), - securityMgr) + registerAM(sparkConf, rpcEnv, driverRef, sparkConf.getOption("spark.driver.appUIAddress")) registered = true // In client mode the actor will stop the reporter thread. @@ -690,6 +707,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends if (args.primaryRFile != null && args.primaryRFile.endsWith(".R")) { // TODO(davies): add R dependencies here } + val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]]) @@ -813,15 +831,6 @@ object ApplicationMaster extends Logging { def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) - - // Load the properties file with the Spark configuration and set entries as system properties, - // so that user code run inside the AM also has access to them. - // Note: we must do this before SparkHadoopUtil instantiated - if (amArgs.propertiesFile != null) { - Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => - sys.props(k) = v - } - } master = new ApplicationMaster(amArgs) System.exit(master.run()) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1fe25c4ddaabf..3c23ef723e317 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -48,7 +48,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging @@ -58,18 +58,14 @@ import org.apache.spark.util.{CallerContext, Utils} private[spark] class Client( val args: ClientArguments, - val hadoopConf: Configuration, val sparkConf: SparkConf) extends Logging { import Client._ import YarnSparkHadoopUtil._ - def this(clientArgs: ClientArguments, spConf: SparkConf) = - this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) - private val yarnClient = YarnClient.createYarnClient - private val yarnConf = new YarnConfiguration(hadoopConf) + private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster" @@ -125,7 +121,7 @@ private[spark] class Client( private val credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) + conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf)) def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) @@ -134,8 +130,6 @@ private[spark] class Client( def stop(): Unit = { launcherBackend.close() yarnClient.stop() - // Unset YARN mode system env variable, to allow switching between cluster types. - System.clearProperty("SPARK_YARN_MODE") } /** @@ -152,7 +146,7 @@ private[spark] class Client( // Setup the credentials before doing anything else, // so we have don't have issues at any point. setupCredentials() - yarnClient.init(yarnConf) + yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" @@ -398,7 +392,7 @@ private[spark] class Client( if (SparkHadoopUtil.get.isProxyUser(currentUser)) { currentUser.addCredentials(credentials) } - logDebug(YarnSparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) + logDebug(SparkHadoopUtil.get.dumpTokens(credentials).mkString("\n")) } // If we use principal and keytab to login, also credentials can be renewed some time @@ -745,7 +739,7 @@ private[spark] class Client( // Save the YARN configuration into a separate file that will be overlayed on top of the // cluster's Hadoop conf. confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE)) - yarnConf.writeXml(confStream) + hadoopConf.writeXml(confStream) confStream.closeEntry() // Save Spark configuration to a file in the archive. @@ -753,7 +747,12 @@ private[spark] class Client( sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } // Override spark.yarn.key to point to the location in distributed cache which will be used // by AM. - Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) } + Option(amKeytabFileName).foreach { k => + // Do not propagate the app's secret using the config file. + if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) { + props.setProperty(KEYTAB.key, k) + } + } confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8) props.store(writer, "Spark configuration.") @@ -773,8 +772,7 @@ private[spark] class Client( pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() - populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) - env("SPARK_YARN_MODE") = "true" + populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { @@ -824,6 +822,11 @@ private[spark] class Client( } } sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) + } else { + // Propagate the auth secret to the AM using the environment, if set. + sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF).foreach { secret => + env(SecurityManager.ENV_AUTH_SECRET) = secret + } } sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp => @@ -848,6 +851,7 @@ private[spark] class Client( } else { Nil } + val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) @@ -978,7 +982,11 @@ private[spark] class Client( logDebug("YARN AM launch context:") logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}") logDebug(" env:") - launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") } + if (log.isDebugEnabled) { + Utils.redact(sparkConf, launchEnv.toSeq).foreach { case (k, v) => + logDebug(s" $k -> $v") + } + } logDebug(" resources:") localResources.foreach { case (k, v) => logDebug(s" $k -> $v")} logDebug(" command:") @@ -1172,24 +1180,6 @@ private[spark] class Client( private object Client extends Logging { - def main(argStrings: Array[String]) { - if (!sys.props.contains("SPARK_SUBMIT")) { - logWarning("WARNING: This client is deprecated and will be removed in a " + - "future version of Spark. Use ./bin/spark-submit with \"--master yarn\"") - } - - // Set an env variable indicating we are running in YARN mode. - // Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes - System.setProperty("SPARK_YARN_MODE", "true") - val sparkConf = new SparkConf - // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, - // so remove them from sparkConf here for yarn mode. - sparkConf.remove("spark.jars") - sparkConf.remove("spark.files") - val args = new ClientArguments(argStrings) - new Client(args, sparkConf).run() - } - // Alias for the user jar val APP_JAR_NAME: String = "__app__.jar" @@ -1493,3 +1483,16 @@ private object Client extends Logging { } } + +private[spark] class YarnClusterApplication extends SparkApplication { + + override def start(args: Array[String], conf: SparkConf): Unit = { + // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, + // so remove them from sparkConf here for yarn mode. + conf.remove("spark.jars") + conf.remove("spark.files") + + new Client(new ClientArguments(args), conf).run() + } + +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 3f4d236571ffd..2886bd1964a60 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -245,6 +245,10 @@ private[yarn] class ExecutorRunnable( } } + if (securityMgr.getSecretKey() != null) { + env(SecurityManager.ENV_AUTH_SECRET) = securityMgr.getSecretKey() + } + System.getenv().asScala.filterKeys(_.startsWith("SPARK")) .foreach { case (k, v) => env(k) = v } env diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 72f4d273ab53b..c1ae12aabb8cc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -92,7 +92,7 @@ private[spark] class YarnRMClient extends Logging { /** Returns the attempt ID. */ def getAttemptId(): ApplicationAttemptId = { - YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId() + YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() } /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 3d9f99f57bed7..bf4eaf5e5530a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -17,21 +17,14 @@ package org.apache.spark.deploy.yarn -import java.nio.charset.StandardCharsets.UTF_8 -import java.util.regex.Matcher -import java.util.regex.Pattern +import java.util.regex.{Matcher, Pattern} import scala.collection.mutable.{HashMap, ListBuffer} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.{JobConf, Master} -import org.apache.hadoop.security.Credentials -import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} -import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -43,87 +36,10 @@ import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils - -/** - * Contains util methods to interact with Hadoop from spark. - */ -class YarnSparkHadoopUtil extends SparkHadoopUtil { +object YarnSparkHadoopUtil { private var credentialUpdater: CredentialUpdater = _ - override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { - dest.addCredentials(source.getCredentials()) - } - - // Note that all params which start with SPARK are propagated all the way through, so if in yarn - // mode, this MUST be set to true. - override def isYarnMode(): Boolean = { true } - - // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop - // subsystems. Always create a new config, don't reuse yarnConf. - override def newConfiguration(conf: SparkConf): Configuration = { - val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) - hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) - hadoopConf - } - - // Add any user credentials to the job conf which are necessary for running on a secure Hadoop - // cluster - override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials() - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) - } - - override def addSecretKeyToUserCredentials(key: String, secret: String) { - val creds = new Credentials() - creds.addSecretKey(new Text(key), secret.getBytes(UTF_8)) - addCurrentUserCredentials(creds) - } - - override def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { - val credentials = getCurrentUserCredentials() - if (credentials != null) credentials.getSecretKey(new Text(key)) else null - } - - private[spark] override def startCredentialUpdater(sparkConf: SparkConf): Unit = { - val hadoopConf = newConfiguration(sparkConf) - val credentialManager = new YARNHadoopDelegationTokenManager( - sparkConf, - hadoopConf, - conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) - credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) - credentialUpdater.start() - } - - private[spark] override def stopCredentialUpdater(): Unit = { - if (credentialUpdater != null) { - credentialUpdater.stop() - credentialUpdater = null - } - } - - private[spark] def getContainerId: ContainerId = { - val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) - ConverterUtils.toContainerId(containerIdString) - } - - /** The filesystems for which YARN should fetch delegation tokens. */ - private[spark] def hadoopFSsToAccess( - sparkConf: SparkConf, - hadoopConf: Configuration): Set[FileSystem] = { - val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) - .map(new Path(_).getFileSystem(hadoopConf)) - .toSet - - val stagingFS = sparkConf.get(STAGING_DIR) - .map(new Path(_).getFileSystem(hadoopConf)) - .getOrElse(FileSystem.get(hadoopConf)) - - filesystemsToAccess + stagingFS - } -} - -object YarnSparkHadoopUtil { // Additional memory overhead // 10% was arrived at experimentally. In the interest of minimizing memory waste while covering // the common cases. Memory overhead tends to grow with container size. @@ -139,14 +55,6 @@ object YarnSparkHadoopUtil { // request types (like map/reduce in hadoop for example) val RM_REQUEST_PRIORITY = Priority.newInstance(1) - def get: YarnSparkHadoopUtil = { - val yarnMode = java.lang.Boolean.parseBoolean( - System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) - if (!yarnMode) { - throw new SparkException("YarnSparkHadoopUtil is not available in non-YARN mode!") - } - SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil] - } /** * Add a path variable to the given environment map. * If the map already contains this key, append the value to the existing value instead. @@ -301,5 +209,42 @@ object YarnSparkHadoopUtil { conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors) } } -} + def getContainerId: ContainerId = { + val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + ConverterUtils.toContainerId(containerIdString) + } + + /** The filesystems for which YARN should fetch delegation tokens. */ + def hadoopFSsToAccess( + sparkConf: SparkConf, + hadoopConf: Configuration): Set[FileSystem] = { + val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) + .map(new Path(_).getFileSystem(hadoopConf)) + .toSet + + val stagingFS = sparkConf.get(STAGING_DIR) + .map(new Path(_).getFileSystem(hadoopConf)) + .getOrElse(FileSystem.get(hadoopConf)) + + filesystemsToAccess + stagingFS + } + + def startCredentialUpdater(sparkConf: SparkConf): Unit = { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + val credentialManager = new YARNHadoopDelegationTokenManager( + sparkConf, + hadoopConf, + conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf)) + credentialUpdater = new CredentialUpdater(sparkConf, hadoopConf, credentialManager) + credentialUpdater.start() + } + + def stopCredentialUpdater(): Unit = { + if (credentialUpdater != null) { + credentialUpdater.stop() + credentialUpdater = null + } + } + +} diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala index 68a2e9e70a78b..a03f9813181fe 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/AMCredentialRenewer.scala @@ -62,7 +62,7 @@ private[yarn] class AMCredentialRenewer( Executors.newSingleThreadScheduledExecutor( ThreadUtils.namedThreadFactory("Credential Refresh Thread")) - private val hadoopUtil = YarnSparkHadoopUtil.get + private val hadoopUtil = SparkHadoopUtil.get private val credentialsFile = sparkConf.get(CREDENTIALS_FILE_PATH) private val daysToKeepFiles = sparkConf.get(CREDENTIALS_FILE_MAX_RETENTION) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index d482376d14dd7..8187eb50f361f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -66,7 +66,7 @@ private[spark] class YarnClientSchedulerBackend( // reads the credentials from HDFS, just like the executors and updates its own credentials // cache. if (conf.contains("spark.yarn.credentials.file")) { - YarnSparkHadoopUtil.get.startCredentialUpdater(conf) + YarnSparkHadoopUtil.startCredentialUpdater(conf) } monitorThread = asyncMonitorApplication() monitorThread.start() @@ -153,7 +153,7 @@ private[spark] class YarnClientSchedulerBackend( client.reportLauncherState(SparkAppHandle.State.FINISHED) super.stop() - YarnSparkHadoopUtil.get.stopCredentialUpdater() + YarnSparkHadoopUtil.stopCredentialUpdater() client.stop() logInfo("Stopped") } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 4f3d5ebf403e0..2c14b254ffa5a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -41,7 +41,7 @@ private[spark] class YarnClusterSchedulerBackend( var driverLogs: Option[Map[String, String]] = None try { val yarnConf = new YarnConfiguration(sc.hadoopConfiguration) - val containerId = YarnSparkHadoopUtil.get.getContainerId + val containerId = YarnSparkHadoopUtil.getContainerId val httpAddress = System.getenv(Environment.NM_HOST.name()) + ":" + System.getenv(Environment.NM_HTTP_PORT.name()) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 6cf68427921fd..c646cc8b40dea 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -185,7 +185,6 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } test("configuration and args propagate through createApplicationSubmissionContext") { - val conf = new Configuration() // When parsing tags, duplicates and leading/trailing whitespace should be removed. // Spaces between non-comma strings should be preserved as single tags. Empty strings may or // may not be removed depending on the version of Hadoop being used. @@ -200,7 +199,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - val client = new Client(args, conf, sparkConf) + val client = new Client(args, sparkConf) client.createApplicationSubmissionContext( new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) @@ -407,15 +406,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll private def createClient( sparkConf: SparkConf, - conf: Configuration = new Configuration(), args: Array[String] = Array()): Client = { val clientArgs = new ClientArguments(args) - spy(new Client(clientArgs, conf, sparkConf)) + spy(new Client(clientArgs, sparkConf)) } private def classpath(client: Client): Array[String] = { val env = new MutableHashMap[String, String]() - populateClasspath(null, client.hadoopConf, client.sparkConf, env) + populateClasspath(null, new Configuration(), client.sparkConf, env) classpath(env) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index d5de19072ce29..ab0005d7b53a8 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -115,8 +115,13 @@ class YarnClusterSuite extends BaseYarnClusterSuite { )) } - test("run Spark in yarn-cluster mode with using SparkHadoopUtil.conf") { - testYarnAppUseSparkHadoopUtilConf() + test("yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414)") { + val result = File.createTempFile("result", null, tempDir) + val finalState = runSpark(false, + mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), + appArgs = Seq("key=value", result.getAbsolutePath()), + extraConf = Map("spark.hadoop.key" -> "value")) + checkResult(finalState, result) } test("run Spark in yarn-client mode with additional jar") { @@ -216,15 +221,6 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } - private def testYarnAppUseSparkHadoopUtilConf(): Unit = { - val result = File.createTempFile("result", null, tempDir) - val finalState = runSpark(false, - mainClassName(YarnClusterDriverUseSparkHadoopUtilConf.getClass), - appArgs = Seq("key=value", result.getAbsolutePath()), - extraConf = Map("spark.hadoop.key" -> "value")) - checkResult(finalState, result) - } - private def testWithAddJar(clientMode: Boolean): Unit = { val originalJar = TestUtils.createJarWithFiles(Map("test.resource" -> "ORIGINAL"), tempDir) val driverResult = File.createTempFile("driver", null, tempDir) @@ -424,7 +420,7 @@ private object YarnClusterDriver extends Logging with Matchers { s"Driver logs contain sensitive info (${SECRET_PASSWORD}): \n${log} " ) } - val containerId = YarnSparkHadoopUtil.get.getContainerId + val containerId = YarnSparkHadoopUtil.getContainerId val user = Utils.getCurrentUserName() assert(urlStr.endsWith(s"/node/containerlogs/$containerId/$user/stderr?start=-4096")) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index a057618b39950..f21353aa007c8 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -71,14 +71,10 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging test("Yarn configuration override") { val key = "yarn.nodemanager.hostname" - val default = new YarnConfiguration() - val sparkConf = new SparkConf() .set("spark.hadoop." + key, "someHostName") - val yarnConf = new YarnSparkHadoopUtil().newConfiguration(sparkConf) - - yarnConf.getClass() should be (classOf[YarnConfiguration]) - yarnConf.get(key) should not be default.get(key) + val yarnConf = new YarnConfiguration(SparkHadoopUtil.get.newConfiguration(sparkConf)) + yarnConf.get(key) should be ("someHostName") } @@ -145,45 +141,4 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } - test("check different hadoop utils based on env variable") { - try { - System.setProperty("SPARK_YARN_MODE", "true") - assert(SparkHadoopUtil.get.getClass === classOf[YarnSparkHadoopUtil]) - System.setProperty("SPARK_YARN_MODE", "false") - assert(SparkHadoopUtil.get.getClass === classOf[SparkHadoopUtil]) - } finally { - System.clearProperty("SPARK_YARN_MODE") - } - } - - - - // This test needs to live here because it depends on isYarnMode returning true, which can only - // happen in the YARN module. - test("security manager token generation") { - try { - System.setProperty("SPARK_YARN_MODE", "true") - val initial = SparkHadoopUtil.get - .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY) - assert(initial === null || initial.length === 0) - - val conf = new SparkConf() - .set(SecurityManager.SPARK_AUTH_CONF, "true") - .set(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") - val sm = new SecurityManager(conf) - - val generated = SparkHadoopUtil.get - .getSecretKeyFromUserCredentials(SecurityManager.SECRET_LOOKUP_KEY) - assert(generated != null) - val genString = new Text(generated).toString() - assert(genString != "unused") - assert(sm.getSecretKey() === genString) - } finally { - // removeSecretKey() was only added in Hadoop 2.6, so instead we just set the secret - // to an empty string. - SparkHadoopUtil.get.addSecretKeyToUserCredentials(SecurityManager.SECRET_LOOKUP_KEY, "") - System.clearProperty("SPARK_YARN_MODE") - } - } - } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index c918998bde07c..edde884d7a06f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -48,7 +48,7 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, hadoopConf, - conf => YarnSparkHadoopUtil.get.hadoopFSsToAccess(sparkConf, conf)) + conf => YarnSparkHadoopUtil.hadoopFSsToAccess(sparkConf, conf)) credentialManager.credentialProviders.get("yarn-test") should not be (None) } From cee6be231310b89d35d5b419bd30153f29dd4cb9 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 1 Nov 2017 12:45:12 -0700 Subject: [PATCH 02/10] Add mima excludes. SparkHadoopUtil is not documented as a public class in Spark's API docs, so just added new exclusions. --- project/MimaExcludes.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 45b8870f3b62f..a2d903fd63be9 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,12 @@ object MimaExcludes { // Exclude rules for 2.3.x lazy val v23excludes = v22excludes ++ Seq( + // SPARK-22372: Make cluster submission use SparkApplication. + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getSecretKeyFromUserCredentials"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.isYarnMode"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getCurrentUserCredentials"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.addSecretKeyToUserCredentials"), + // SPARK-18085: Better History Server scalability for many / large applications ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"), // [SPARK-20495][SQL] Add StorageLevel to cacheTable API From cb6c8ffe9eb04e05b8cd982e123ae962bd2e7373 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 7 Nov 2017 09:55:31 -0800 Subject: [PATCH 03/10] Remove stale comment. --- core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5a08084d270f9..6f74c4d08ba56 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -368,8 +368,6 @@ object SparkSubmit extends CommandLineUtils with Logging { args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull - // This security manager will not need an auth secret, but set a dummy value in case - // spark.authenticate is enabled, otherwise an exception is thrown. lazy val secMgr = new SecurityManager(sparkConf) // In client mode, download remote files. From 593eb6b67c871706d7f4b418dfa6fb111ec06fde Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 14 Nov 2017 10:16:59 -0800 Subject: [PATCH 04/10] Rename constants. --- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +++++++++-------- .../apache/spark/deploy/SparkSubmitSuite.scala | 8 ++++---- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 6f74c4d08ba56..cfcdce648d330 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -93,9 +93,10 @@ object SparkSubmit extends CommandLineUtils with Logging { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 // Following constants are visible for testing. - private[deploy] val YARN_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" - private[deploy] val REST_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() - private[deploy] val STANDALONE_SUBMIT_CLASS = classOf[ClientApp].getName() + private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = + "org.apache.spark.deploy.yarn.YarnClusterApplication" + private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() + private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() // scalastyle:off println private[spark] def printVersionAndExit(): Unit = { @@ -286,7 +287,7 @@ object SparkSubmit extends CommandLineUtils with Logging { } // Make sure YARN is included in our build if we're trying to use it - if (!Utils.classIsLoadable(YARN_SUBMIT_CLASS) && !Utils.isTesting) { + if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { printErrorAndExit( "Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.") @@ -634,11 +635,11 @@ object SparkSubmit extends CommandLineUtils with Logging { // All Spark parameters are expected to be passed to the client through system properties. if (args.isStandaloneCluster) { if (args.useRest) { - childMainClass = REST_SUBMIT_CLASS + childMainClass = REST_CLUSTER_SUBMIT_CLASS childArgs += (args.primaryResource, args.mainClass) } else { // In legacy standalone cluster mode, use Client as a wrapper around the user class - childMainClass = STANDALONE_SUBMIT_CLASS + childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS if (args.supervise) { childArgs += "--supervise" } Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) } Option(args.driverCores).foreach { c => childArgs += ("--cores", c) } @@ -663,7 +664,7 @@ object SparkSubmit extends CommandLineUtils with Logging { // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { - childMainClass = YARN_SUBMIT_CLASS + childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") @@ -684,7 +685,7 @@ object SparkSubmit extends CommandLineUtils with Logging { if (isMesosCluster) { assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API") - childMainClass = REST_SUBMIT_CLASS + childMainClass = REST_CLUSTER_SUBMIT_CLASS if (args.isPython) { // Second argument is main class childArgs += (args.primaryResource, "") diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 1c813532f3582..dea2e2b87e394 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -232,7 +232,7 @@ class SparkSubmitSuite childArgsStr should include ("--class org.SomeClass") childArgsStr should include ("--arg arg1 --arg arg2") childArgsStr should include regex ("--jar .*thejar.jar") - mainClass should be (SparkSubmit.YARN_SUBMIT_CLASS) + mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS) // In yarn cluster mode, also adding jars to classpath classpath(0) should endWith ("thejar.jar") @@ -320,11 +320,11 @@ class SparkSubmitSuite val childArgsStr = childArgs.mkString(" ") if (useRest) { childArgsStr should endWith ("thejar.jar org.SomeClass arg1 arg2") - mainClass should be (SparkSubmit.REST_SUBMIT_CLASS) + mainClass should be (SparkSubmit.REST_CLUSTER_SUBMIT_CLASS) } else { childArgsStr should startWith ("--supervise --memory 4g --cores 5") childArgsStr should include regex "launch spark://h:p .*thejar.jar org.SomeClass arg1 arg2" - mainClass should be (SparkSubmit.STANDALONE_SUBMIT_CLASS) + mainClass should be (SparkSubmit.STANDALONE_CLUSTER_SUBMIT_CLASS) } classpath should have size 0 sys.props("SPARK_SUBMIT") should be ("true") @@ -399,7 +399,7 @@ class SparkSubmitSuite conf.get("spark.executor.memory") should be ("5g") conf.get("spark.master") should be ("yarn") conf.get("spark.submit.deployMode") should be ("cluster") - mainClass should be (SparkSubmit.YARN_SUBMIT_CLASS) + mainClass should be (SparkSubmit.YARN_CLUSTER_SUBMIT_CLASS) } test("SPARK-21568 ConsoleProgressBar should be enabled only in shells") { From 121bcf8a2758858cad8e88e7fb7d78566494765b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 14 Nov 2017 18:33:24 -0800 Subject: [PATCH 05/10] Correctly filter out SPARK_AUTH_SECRET_CONF. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 3c23ef723e317..f2d128437e428 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -742,17 +742,14 @@ private[spark] class Client( hadoopConf.writeXml(confStream) confStream.closeEntry() - // Save Spark configuration to a file in the archive. + // Save Spark configuration to a file in the archive, but filter out the app's secret. val props = new Properties() - sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } + sparkConf.getAll.foreach { case (k, v) if k != SecurityManager.SPARK_AUTH_SECRET_CONF => + props.setProperty(k, v) + } // Override spark.yarn.key to point to the location in distributed cache which will be used // by AM. - Option(amKeytabFileName).foreach { k => - // Do not propagate the app's secret using the config file. - if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) { - props.setProperty(KEYTAB.key, k) - } - } + Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) } confStream.putNextEntry(new ZipEntry(SPARK_CONF_FILE)) val writer = new OutputStreamWriter(confStream, StandardCharsets.UTF_8) props.store(writer, "Spark configuration.") From 08f47ca3fb54315c537b1134e31e0a1a912c285e Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 14 Nov 2017 18:41:49 -0800 Subject: [PATCH 06/10] Cleanup SPARK_YARN_MODE in tests. --- .../deploy/yarn/BaseYarnClusterSuite.scala | 5 ---- .../spark/deploy/yarn/ClientSuite.scala | 23 +++---------------- ...ARNHadoopDelegationTokenManagerSuite.scala | 9 -------- 3 files changed, 3 insertions(+), 34 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala index 9c3b18e4ec5f3..ac67f2196e0a0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala @@ -62,18 +62,14 @@ abstract class BaseYarnClusterSuite protected var hadoopConfDir: File = _ private var logConfDir: File = _ - var oldSystemProperties: Properties = null - def newYarnConfig(): YarnConfiguration override def beforeAll() { super.beforeAll() - oldSystemProperties = SerializationUtils.clone(System.getProperties) tempDir = Utils.createTempDir() logConfDir = new File(tempDir, "log4j") logConfDir.mkdir() - System.setProperty("SPARK_YARN_MODE", "true") val logConfFile = new File(logConfDir, "log4j.properties") Files.write(LOG4J_CONF, logConfFile, StandardCharsets.UTF_8) @@ -124,7 +120,6 @@ abstract class BaseYarnClusterSuite try { yarnCluster.stop() } finally { - System.setProperties(oldSystemProperties) super.afterAll() } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index c646cc8b40dea..9d5f5eb621118 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -24,7 +24,6 @@ import java.util.Properties import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap => MutableHashMap} -import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig @@ -36,34 +35,18 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records import org.mockito.Matchers.{eq => meq, _} import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfterAll, Matchers} +import org.scalatest.Matchers import org.apache.spark.{SparkConf, SparkFunSuite, TestUtils} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.util.{ResetSystemProperties, SparkConfWithEnv, Utils} +import org.apache.spark.util.{SparkConfWithEnv, Utils} -class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll - with ResetSystemProperties { +class ClientSuite extends SparkFunSuite with Matchers { import Client._ var oldSystemProperties: Properties = null - override def beforeAll(): Unit = { - super.beforeAll() - oldSystemProperties = SerializationUtils.clone(System.getProperties) - System.setProperty("SPARK_YARN_MODE", "true") - } - - override def afterAll(): Unit = { - try { - System.setProperties(oldSystemProperties) - oldSystemProperties = null - } finally { - super.afterAll() - } - } - test("default Yarn application classpath") { getDefaultYarnApplicationClasspath should be(Fixtures.knownDefYarnAppCP) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala index edde884d7a06f..3c7cdc0f1dab8 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/security/YARNHadoopDelegationTokenManagerSuite.scala @@ -31,19 +31,10 @@ class YARNHadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers override def beforeAll(): Unit = { super.beforeAll() - - System.setProperty("SPARK_YARN_MODE", "true") - sparkConf = new SparkConf() hadoopConf = new Configuration() } - override def afterAll(): Unit = { - super.afterAll() - - System.clearProperty("SPARK_YARN_MODE") - } - test("Correctly loads credential providers") { credentialManager = new YARNHadoopDelegationTokenManager( sparkConf, From 2129ccb13759fbc7784ba31e7c0e6aa08904cb42 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 16 Nov 2017 11:39:28 -0800 Subject: [PATCH 07/10] Propagate secret to executors from AM to avoid writing it to disk. --- core/src/main/scala/org/apache/spark/SecurityManager.scala | 1 - .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 5 +++++ .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 ++++-- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 5fe7e54560739..c805c629e56b2 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -542,7 +542,6 @@ private[spark] class SecurityManager( val secret = HashCodes.fromBytes(secretBytes).toString() sparkConf.set(SPARK_AUTH_SECRET_CONF, secret) - sparkConf.setExecutorEnv(ENV_AUTH_SECRET, secret) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index f8cc8a10d9c62..7a5d036aee435 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -72,6 +72,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends securityMgr.initializeAuth() } + // If an auth secret is configured, propagate it to executors. + Option(securityMgr.getSecretKey()).foreach { secret => + sparkConf.setExecutorEnv(SecurityManager.ENV_AUTH_SECRET, secret) + } + // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class // - The user application creating a new SparkConf in cluster mode diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index f2d128437e428..768960bed9abc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -744,8 +744,10 @@ private[spark] class Client( // Save Spark configuration to a file in the archive, but filter out the app's secret. val props = new Properties() - sparkConf.getAll.foreach { case (k, v) if k != SecurityManager.SPARK_AUTH_SECRET_CONF => - props.setProperty(k, v) + sparkConf.getAll.foreach { case (k, v) => + if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) { + props.setProperty(k, v) + } } // Override spark.yarn.key to point to the location in distributed cache which will be used // by AM. From 86f0bf8dcdd4a3dfc43f0e1f815b348f3477b250 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 16 Nov 2017 14:51:33 -0800 Subject: [PATCH 08/10] Unbreak test. --- .../test/scala/org/apache/spark/SecurityManagerSuite.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index c37925046b990..1b24997d421dc 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -443,10 +443,6 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { val mgr = new SecurityManager(conf) mgr.initializeAuth() assert(mgr.getSecretKey() != null) - - val confVal = conf.get(SecurityManager.SPARK_AUTH_SECRET_CONF) - val envVal = conf.get("spark.executorEnv." + SecurityManager.ENV_AUTH_SECRET) - assert(confVal === envVal) } } From 515f8f8b1d2ebf56171df5372f13250d09415d2b Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 30 Nov 2017 14:30:50 -0800 Subject: [PATCH 09/10] Feedback. --- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 5 ----- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 54254c4887b57..65a7633275e5a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -72,11 +72,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends securityMgr.initializeAuth() } - // If an auth secret is configured, propagate it to executors. - Option(securityMgr.getSecretKey()).foreach { secret => - sparkConf.setExecutorEnv(SecurityManager.ENV_AUTH_SECRET, secret) - } - // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class // - The user application creating a new SparkConf in cluster mode diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 227f722c5aaec..80bc5026d448f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -835,7 +835,7 @@ private[spark] class Client( } sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) } else { - // Propagate the auth secret to the AM using the environment, if set. + // Only propagate the auth secret to the AM in client mode, using the environment, if set. sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF).foreach { secret => env(SecurityManager.ENV_AUTH_SECRET) = secret } From c752453b2a379f301d52692bfc639bb631520069 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 30 Nov 2017 16:17:26 -0800 Subject: [PATCH 10/10] Go back to propagating secret using credentials. --- .../org/apache/spark/SecurityManager.scala | 36 ++++++++++--------- .../apache/spark/SecurityManagerSuite.scala | 18 ++++++++-- .../spark/deploy/yarn/ApplicationMaster.scala | 5 --- .../org/apache/spark/deploy/yarn/Client.scala | 9 +---- .../spark/deploy/yarn/ExecutorRunnable.scala | 4 --- 5 files changed, 36 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index c805c629e56b2..4c1dbe3ffb4ad 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -19,12 +19,15 @@ package org.apache.spark import java.lang.{Byte => JByte} import java.net.{Authenticator, PasswordAuthentication} +import java.nio.charset.StandardCharsets.UTF_8 import java.security.{KeyStore, SecureRandom} import java.security.cert.X509Certificate import javax.net.ssl._ import com.google.common.hash.HashCodes import com.google.common.io.Files +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -498,7 +501,10 @@ private[spark] class SecurityManager( */ def getSecretKey(): String = { if (isAuthenticationEnabled) { - Option(sparkConf.getenv(ENV_AUTH_SECRET)) + val creds = UserGroupInformation.getCurrentUser().getCredentials() + Option(creds.getSecretKey(SECRET_LOOKUP_KEY)) + .map { bytes => new String(bytes, UTF_8) } + .orElse(Option(sparkConf.getenv(ENV_AUTH_SECRET))) .orElse(sparkConf.getOption(SPARK_AUTH_SECRET_CONF)) .getOrElse { throw new IllegalArgumentException( @@ -510,12 +516,11 @@ private[spark] class SecurityManager( } /** - * Initialize the configuration object held by this class for authentication. + * Initialize the authentication secret. * * If authentication is disabled, do nothing. * - * In YARN mode, generate a secret key and store it in the configuration object, setting it up to - * also be propagated to executors using an env variable. + * In YARN mode, generate a new secret and store it in the current user's credentials. * * In other modes, assert that the auth secret is set in the configuration. */ @@ -530,19 +535,14 @@ private[spark] class SecurityManager( return } - // In YARN, force creation of a new secret if this is client mode. This ensures each - // YARN app uses a different secret. For cluster mode, this relies on YARN's client to - // not propagate the secret to the driver, which will then generate a new one. - val deployMode = sparkConf.get(SparkLauncher.DEPLOY_MODE, "client") - if (!sparkConf.contains(SPARK_AUTH_SECRET_CONF) || deployMode == "client") { - val rnd = new SecureRandom() - val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE - val secretBytes = new Array[Byte](length) - rnd.nextBytes(secretBytes) - - val secret = HashCodes.fromBytes(secretBytes).toString() - sparkConf.set(SPARK_AUTH_SECRET_CONF, secret) - } + val rnd = new SecureRandom() + val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE + val secretBytes = new Array[Byte](length) + rnd.nextBytes(secretBytes) + + val creds = new Credentials() + creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes) + UserGroupInformation.getCurrentUser().addCredentials(creds) } // Default SecurityManager only has a single secret key, so ignore appId. @@ -558,4 +558,6 @@ private[spark] object SecurityManager { // value as SPARK_AUTH_SECRET_CONF set in SparkConf val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET" + // key used to store the spark secret in the Hadoop UGI + val SECRET_LOOKUP_KEY = new Text("sparkCookie") } diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala index 1b24997d421dc..cf59265dd646d 100644 --- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala @@ -18,6 +18,10 @@ package org.apache.spark import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 +import java.security.PrivilegedExceptionAction + +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.internal.config._ import org.apache.spark.launcher.SparkLauncher @@ -441,8 +445,18 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties { .set(NETWORK_AUTH_ENABLED, true) .set(SparkLauncher.SPARK_MASTER, "yarn") val mgr = new SecurityManager(conf) - mgr.initializeAuth() - assert(mgr.getSecretKey() != null) + + UserGroupInformation.createUserForTesting("authTest", Array()).doAs( + new PrivilegedExceptionAction[Unit]() { + override def run(): Unit = { + mgr.initializeAuth() + val creds = UserGroupInformation.getCurrentUser().getCredentials() + val secret = creds.getSecretKey(SecurityManager.SECRET_LOOKUP_KEY) + assert(secret != null) + assert(new String(secret, UTF_8) === mgr.getSecretKey()) + } + } + ) } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 65a7633275e5a..b2576b0d72633 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -65,12 +65,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends } } - // Initialize the security manager for authentication, if enabled. This needs to be done - // before the config is propagated to the system properties. private val securityMgr = new SecurityManager(sparkConf) - if (isClusterMode) { - securityMgr.initializeAuth() - } // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 80bc5026d448f..3781b261a0381 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -758,9 +758,7 @@ private[spark] class Client( // Save Spark configuration to a file in the archive, but filter out the app's secret. val props = new Properties() sparkConf.getAll.foreach { case (k, v) => - if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) { - props.setProperty(k, v) - } + props.setProperty(k, v) } // Override spark.yarn.key to point to the location in distributed cache which will be used // by AM. @@ -834,11 +832,6 @@ private[spark] class Client( } } sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _)) - } else { - // Only propagate the auth secret to the AM in client mode, using the environment, if set. - sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF).foreach { secret => - env(SecurityManager.ENV_AUTH_SECRET) = secret - } } sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 2886bd1964a60..3f4d236571ffd 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -245,10 +245,6 @@ private[yarn] class ExecutorRunnable( } } - if (securityMgr.getSecretKey() != null) { - env(SecurityManager.ENV_AUTH_SECRET) = securityMgr.getSecretKey() - } - System.getenv().asScala.filterKeys(_.startsWith("SPARK")) .foreach { case (k, v) => env(k) = v } env