-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode #19616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
e51f99e
[SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client
764c302
Merge branch 'master' into SPARK-22404
640013b
Adding Client changes missed while merging master
cba0c6d
Resolving ApplicationMaster conflicts with the merge
19b6c3a
Fixed the review comments
ce94235
Merge branch 'master' into SPARK-22404
0921f7a
Merge branch 'master' into SPARK-22404
837d25f
Merge branch 'master' into SPARK-22404
65aeba9
Fixing the review comments
93b016f
Addressing review comments
dc31940
Review comments addressing
23ad9de
Merge branch 'master' into SPARK-22404
1c02b7d
Merge branch 'master' into SPARK-22404
2429e19
Changes to fix issues with master merge
6854fc4
Updated with review comments fix and added a test
3b377af
Updated to use YARN_UNMANAGED_AM.key
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api._ | |
| import org.apache.hadoop.yarn.api.records._ | ||
| import org.apache.hadoop.yarn.conf.YarnConfiguration | ||
| import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException | ||
| import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils | ||
| import org.apache.hadoop.yarn.util.{ConverterUtils, Records} | ||
|
|
||
| import org.apache.spark._ | ||
|
|
@@ -53,36 +54,27 @@ import org.apache.spark.util._ | |
| /** | ||
| * Common application master functionality for Spark on Yarn. | ||
| */ | ||
| private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging { | ||
| private[spark] class ApplicationMaster( | ||
| args: ApplicationMasterArguments, | ||
| sparkConf: SparkConf, | ||
| yarnConf: YarnConfiguration) extends Logging { | ||
|
|
||
| // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be | ||
| // optimal as more containers are available. Might need to handle this better. | ||
|
|
||
| private val appAttemptId = YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() | ||
| private val isClusterMode = args.userClass != null | ||
|
|
||
| private val sparkConf = new SparkConf() | ||
| if (args.propertiesFile != null) { | ||
| Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => | ||
| sparkConf.set(k, v) | ||
| private val appAttemptId = | ||
| if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) { | ||
| YarnSparkHadoopUtil.getContainerId.getApplicationAttemptId() | ||
| } else { | ||
| null | ||
| } | ||
| } | ||
|
|
||
| private val isClusterMode = args.userClass != null | ||
|
|
||
| private val securityMgr = new SecurityManager(sparkConf) | ||
|
|
||
| private var metricsSystem: Option[MetricsSystem] = None | ||
|
|
||
| // Set system properties for each config entry. This covers two use cases: | ||
| // - The default configuration stored by the SparkHadoopUtil class | ||
| // - The user application creating a new SparkConf in cluster mode | ||
| // | ||
| // Both cases create a new SparkConf object which reads these configs from system properties. | ||
| sparkConf.getAll.foreach { case (k, v) => | ||
| sys.props(k) = v | ||
| } | ||
|
|
||
| private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) | ||
|
|
||
| private val userClassLoader = { | ||
| val classpath = Client.getUserClasspath(sparkConf) | ||
| val urls = classpath.map { entry => | ||
|
|
@@ -152,24 +144,15 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| // Next wait interval before allocator poll. | ||
| private var nextAllocationInterval = initialAllocationInterval | ||
|
|
||
| private var rpcEnv: RpcEnv = null | ||
|
|
||
| // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. | ||
| private val sparkContextPromise = Promise[SparkContext]() | ||
|
|
||
| /** | ||
| * Load the list of localized files set by the client, used when launching executors. This should | ||
| * be called in a context where the needed credentials to access HDFS are available. | ||
| */ | ||
| private def prepareLocalResources(): Map[String, LocalResource] = { | ||
| private def prepareLocalResources(distCacheConf: SparkConf): Map[String, LocalResource] = { | ||
| logInfo("Preparing Local resources") | ||
| val distCacheConf = new SparkConf(false) | ||
| if (args.distCacheConf != null) { | ||
| Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) => | ||
| distCacheConf.set(k, v) | ||
| } | ||
| } | ||
|
|
||
| val resources = HashMap[String, LocalResource]() | ||
|
|
||
| def setupDistributedCache( | ||
|
|
@@ -266,7 +249,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| // we only want to unregister if we don't want the RM to retry | ||
| if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { | ||
| unregister(finalStatus, finalMsg) | ||
| cleanupStagingDir() | ||
| cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR"))) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -298,6 +281,60 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| exitCode | ||
| } | ||
|
|
||
| def runUnmanaged( | ||
| clientRpcEnv: RpcEnv, | ||
| appAttemptId: ApplicationAttemptId, | ||
| stagingDir: Path, | ||
| cachedResourcesConf: SparkConf): Unit = { | ||
| try { | ||
| new CallerContext( | ||
| "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), | ||
| Option(appAttemptId.getApplicationId.toString), None).setCurrentContext() | ||
|
|
||
| val driverRef = clientRpcEnv.setupEndpointRef( | ||
| RpcAddress(sparkConf.get("spark.driver.host"), | ||
| sparkConf.get("spark.driver.port").toInt), | ||
| YarnSchedulerBackend.ENDPOINT_NAME) | ||
| // The client-mode AM doesn't listen for incoming connections, so report an invalid port. | ||
| registerAM(Utils.localHostName, -1, sparkConf, | ||
| sparkConf.getOption("spark.driver.appUIAddress"), appAttemptId) | ||
| addAmIpFilter(Some(driverRef), ProxyUriUtils.getPath(appAttemptId.getApplicationId)) | ||
| createAllocator(driverRef, sparkConf, clientRpcEnv, appAttemptId, cachedResourcesConf) | ||
| reporterThread.join() | ||
| } catch { | ||
| case e: Exception => | ||
| // catch everything else if not specifically handled | ||
| logError("Uncaught exception: ", e) | ||
| finish(FinalApplicationStatus.FAILED, | ||
| ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, | ||
| "Uncaught exception: " + StringUtils.stringifyException(e)) | ||
| if (!unregistered) { | ||
| unregister(finalStatus, finalMsg) | ||
| cleanupStagingDir(stagingDir) | ||
| } | ||
| } finally { | ||
| try { | ||
| metricsSystem.foreach { ms => | ||
| ms.report() | ||
| ms.stop() | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| logWarning("Exception during stopping of the metric system: ", e) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def stopUnmanaged(stagingDir: Path): Unit = { | ||
| if (!finished) { | ||
| finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) | ||
| } | ||
| if (!unregistered) { | ||
| unregister(finalStatus, finalMsg) | ||
| cleanupStagingDir(stagingDir) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Set the default final application status for client mode to UNDEFINED to handle | ||
| * if YARN HA restarts the application so that it properly retries. Set the final | ||
|
|
@@ -375,17 +412,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| host: String, | ||
| port: Int, | ||
| _sparkConf: SparkConf, | ||
| uiAddress: Option[String]): Unit = { | ||
| val appId = appAttemptId.getApplicationId().toString() | ||
| val attemptId = appAttemptId.getAttemptId().toString() | ||
| uiAddress: Option[String], | ||
| appAttempt: ApplicationAttemptId): Unit = { | ||
| val appId = appAttempt.getApplicationId().toString() | ||
| val attemptId = appAttempt.getAttemptId().toString() | ||
| val historyAddress = ApplicationMaster | ||
| .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) | ||
|
|
||
| client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress) | ||
| registered = true | ||
| } | ||
|
|
||
| private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { | ||
| private def createAllocator( | ||
| driverRef: RpcEndpointRef, | ||
| _sparkConf: SparkConf, | ||
| rpcEnv: RpcEnv, | ||
| appAttemptId: ApplicationAttemptId, | ||
| distCacheConf: SparkConf): Unit = { | ||
| // In client mode, the AM may be restarting after delegation tokens have reached their TTL. So | ||
| // always contact the driver to get the current set of valid tokens, so that local resources can | ||
| // be initialized below. | ||
|
|
@@ -399,7 +442,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| val appId = appAttemptId.getApplicationId().toString() | ||
| val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, | ||
| CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString | ||
| val localResources = prepareLocalResources() | ||
| val localResources = prepareLocalResources(distCacheConf) | ||
|
|
||
| // Before we initialize the allocator, let's log the information about how executors will | ||
| // be run up front, to avoid printing this out for every single executor being launched. | ||
|
|
@@ -437,7 +480,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| } | ||
|
|
||
| private def runDriver(): Unit = { | ||
| addAmIpFilter(None) | ||
| addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) | ||
| userClassThread = startUserApplication() | ||
|
|
||
| // This a bit hacky, but we need to wait until the spark.driver.port property has | ||
|
|
@@ -448,17 +491,17 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| val sc = ThreadUtils.awaitResult(sparkContextPromise.future, | ||
| Duration(totalWaitTime, TimeUnit.MILLISECONDS)) | ||
| if (sc != null) { | ||
| rpcEnv = sc.env.rpcEnv | ||
| val rpcEnv = sc.env.rpcEnv | ||
|
|
||
| val userConf = sc.getConf | ||
| val host = userConf.get(DRIVER_HOST_ADDRESS) | ||
| val port = userConf.get(DRIVER_PORT) | ||
| registerAM(host, port, userConf, sc.ui.map(_.webUrl)) | ||
| registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId) | ||
|
|
||
| val driverRef = rpcEnv.setupEndpointRef( | ||
| RpcAddress(host, port), | ||
| YarnSchedulerBackend.ENDPOINT_NAME) | ||
| createAllocator(driverRef, userConf) | ||
| createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf) | ||
| } else { | ||
| // Sanity check; should never happen in normal operation, since sc should only be null | ||
| // if the user app did not create a SparkContext. | ||
|
|
@@ -482,20 +525,21 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| private def runExecutorLauncher(): Unit = { | ||
| val hostname = Utils.localHostName | ||
| val amCores = sparkConf.get(AM_CORES) | ||
| rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, | ||
| val rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr, | ||
| amCores, true) | ||
|
|
||
| // The client-mode AM doesn't listen for incoming connections, so report an invalid port. | ||
| registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS)) | ||
| registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS), appAttemptId) | ||
|
|
||
| // The driver should be up and listening, so unlike cluster mode, just try to connect to it | ||
| // with no waiting or retrying. | ||
| val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0)) | ||
| val driverRef = rpcEnv.setupEndpointRef( | ||
| RpcAddress(driverHost, driverPort), | ||
| YarnSchedulerBackend.ENDPOINT_NAME) | ||
| addAmIpFilter(Some(driverRef)) | ||
| createAllocator(driverRef, sparkConf) | ||
| addAmIpFilter(Some(driverRef), | ||
| System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) | ||
| createAllocator(driverRef, sparkConf, rpcEnv, appAttemptId, distCacheConf) | ||
|
|
||
| // In client mode the actor will stop the reporter thread. | ||
| reporterThread.join() | ||
|
|
@@ -591,15 +635,23 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| t | ||
| } | ||
|
|
||
| private def distCacheConf(): SparkConf = { | ||
| val distCacheConf = new SparkConf(false) | ||
| if (args.distCacheConf != null) { | ||
| Utils.getPropertiesFromFile(args.distCacheConf).foreach { case (k, v) => | ||
| distCacheConf.set(k, v) | ||
| } | ||
| } | ||
| distCacheConf | ||
| } | ||
|
|
||
| /** | ||
| * Clean up the staging directory. | ||
| */ | ||
| private def cleanupStagingDir(): Unit = { | ||
| var stagingDirPath: Path = null | ||
| private def cleanupStagingDir(stagingDirPath: Path): Unit = { | ||
| try { | ||
| val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) | ||
| if (!preserveFiles) { | ||
| stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) | ||
| logInfo("Deleting staging directory " + stagingDirPath) | ||
| val fs = stagingDirPath.getFileSystem(yarnConf) | ||
| fs.delete(stagingDirPath, true) | ||
|
|
@@ -611,8 +663,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| } | ||
|
|
||
| /** Add the Yarn IP filter that is required for properly securing the UI. */ | ||
| private def addAmIpFilter(driver: Option[RpcEndpointRef]) = { | ||
| val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) | ||
| private def addAmIpFilter(driver: Option[RpcEndpointRef], proxyBase: String) = { | ||
| val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" | ||
| val params = client.getAmIpFilterParams(yarnConf, proxyBase) | ||
| driver match { | ||
|
|
@@ -742,9 +793,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends | |
| } | ||
|
|
||
| override def onDisconnected(remoteAddress: RpcAddress): Unit = { | ||
| // In cluster mode, do not rely on the disassociated event to exit | ||
| // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit | ||
| // This avoids potentially reporting incorrect exit codes if the driver fails | ||
| if (!isClusterMode) { | ||
| if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update comment above? |
||
| logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") | ||
| finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) | ||
| } | ||
|
|
@@ -770,12 +821,28 @@ object ApplicationMaster extends Logging { | |
| def main(args: Array[String]): Unit = { | ||
| SignalUtils.registerLogger(log) | ||
| val amArgs = new ApplicationMasterArguments(args) | ||
| master = new ApplicationMaster(amArgs) | ||
| val sparkConf = new SparkConf() | ||
| if (amArgs.propertiesFile != null) { | ||
| Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => | ||
| sparkConf.set(k, v) | ||
| } | ||
| } | ||
| // Set system properties for each config entry. This covers two use cases: | ||
| // - The default configuration stored by the SparkHadoopUtil class | ||
| // - The user application creating a new SparkConf in cluster mode | ||
| // | ||
| // Both cases create a new SparkConf object which reads these configs from system properties. | ||
| sparkConf.getAll.foreach { case (k, v) => | ||
| sys.props(k) = v | ||
| } | ||
|
|
||
| val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) | ||
| master = new ApplicationMaster(amArgs, sparkConf, yarnConf) | ||
|
|
||
| val ugi = master.sparkConf.get(PRINCIPAL) match { | ||
| val ugi = sparkConf.get(PRINCIPAL) match { | ||
| case Some(principal) => | ||
| val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() | ||
| SparkHadoopUtil.get.loginUserFromKeytab(principal, master.sparkConf.get(KEYTAB).orNull) | ||
| SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull) | ||
| val newUGI = UserGroupInformation.getCurrentUser() | ||
| // Transfer the original user's tokens to the new user, since it may contain needed tokens | ||
| // (such as those user to connect to YARN). | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this code needed here? Won't it be called when the client calls
stopUnmanaged?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
appMaster.runUnmanagedis running in a daemon thread, if something goes unexpected inappMaster.runUnmanagedthen the daemon thread stops and monitor thread will not know about it and continue with the status as ACCEPTED/RUNNING, this code unregisters with RM so that the Client/monitor thread gets the application report status as FAILED and stops the services includingsc.