Skip to content

Conversation

@devaraj-kavali
Copy link

@devaraj-kavali devaraj-kavali commented Oct 31, 2017

What changes were proposed in this pull request?

Providing a new configuration "spark.yarn.un-managed-am" (defaults to false) to enable the Unmanaged AM Application in Yarn Client mode which launches the Application Master service as part of the Client. It utilizes the existing code for communicating between the Application Master <-> Task Scheduler for the container requests/allocations/launch, and eliminates these,

  1. Allocating and launching the Application Master container
  2. Remote Node/Process communication between Application Master <-> Task Scheduler

How was this patch tested?

I verified manually running the applications in yarn-client mode with "spark.yarn.un-managed-am" enabled, and also ensured that there is no impact to the existing execution flows.

I would like to hear others feedback/thoughts on this.

@vanzin
Copy link
Contributor

vanzin commented Dec 12, 2017

@devaraj-kavali why is this a WIP? Are you planning to work more on it before asking for feedback?

@devaraj-kavali
Copy link
Author

@vanzin Thanks for looking into this.

I thought to verify some scenarios before removing WIP, feedback is welcome anytime. Now I see there are some code conflicts, I will resolve conflicts and remove WIP.

@devaraj-kavali devaraj-kavali changed the title [SPARK-22404][YARN][WIP] Provide an option to use unmanaged AM in yarn-client mode [SPARK-22404][YARN] Provide an option to use unmanaged AM in yarn-client mode Dec 14, 2017
@vanzin
Copy link
Contributor

vanzin commented Dec 19, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Dec 19, 2017

Test build #85125 has finished for PR 19616 at commit cba0c6d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"

private val isClientUnmanagedAMEnabled =
sparkConf.getBoolean("spark.yarn.un-managed-am", false) && !isClusterMode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a config constant. Also unmanagedAM is more in line with other config names.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the config name and also added config constants.

// UI's environment page. This works for client mode; for cluster mode, this is handled
// by the AM.
CACHE_CONFIGS.foreach(sparkConf.remove)
if (!isClientUnmanagedAMEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed in the new mode?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is clearing the classpath entries and leading to this error in Executors.

Error: Could not find or load main class org.apache.spark.executor.CoarseGrainedExecutorBackend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is happening because you're starting the AM after these are removed from the conf. Should probably juggle things around or change how these are provided to the AM, since these configs are super noisy and shouldn't really show up in the UI.

populateClasspath(args, hadoopConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
if (isClientUnmanagedAMEnabled) {
System.setProperty("SPARK_YARN_STAGING_DIR", stagingDirPath.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be propagated some other way? Using system properties is kinda hacky, and makes it dangerous to run another Spark app later in the same JVM.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to get from the spark conf and the application id.

private def startApplicationMasterService(report: ApplicationReport) = {
// Add AMRMToken to establish connection between RM and AM
val token = report.getAMRMToken
val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to make this copy? Isn't the Token above enough?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

report.getAMRMToken gives org.apache.hadoop.yarn.api.records.Token type instance, but currentUGI.addToken expects org.apache.hadoop.security.token.Token type instance.

val currentUGI = UserGroupInformation.getCurrentUser
currentUGI.addToken(amRMToken)

System.setProperty(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about using system properties.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to set in sparkConf and use the same in ApplicationMaster while getting the containerId.

* Common application master functionality for Spark on Yarn.
*/
private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends Logging {
private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkConf: SparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't follow Spark's convention for multi-line arguments.

This also looks a little odd now, because there are conflicting arguments. ApplicationMasterArguments is now only used in cluster mode, and everything else is expected to be provided in the other parameters. So while this is the simpler change, it's also a little ugly.

I don't really have a good suggestion right now, but it's something to think about.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made changes to the default constructor and added another constructor. Please check and let me know anything can be done better.

System.setProperty(
ApplicationConstants.Environment.CONTAINER_ID.name(),
ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
val amArgs = new ApplicationMasterArguments(Array("--arg",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty weird, I'd make this an explicit constructor argument for the AM instead. But if I understand this correctly, this is the address the AM will be connecting back to the driver, right?

It seems like there's an opportunity for better code here, since now they'd both be running in the same process. Like in the cluster mode case, where the AM uses the same RpcEnv instance as the driver (see runDriver()).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added another constructor without ApplicationMasterArguments and takes RpcEnv to use the same instance in AM.

val amArgs = new ApplicationMasterArguments(Array("--arg",
sparkConf.get("spark.driver.host") + ":" + sparkConf.get("spark.driver.port")))
// Start Application Service in a separate thread and continue with application monitoring
new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to keep a reference to this thread and join it at some point, to make sure it really goes away? Should it be a daemon thread instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it as daemon thread.

@SparkQA
Copy link

SparkQA commented Feb 15, 2018

Test build #87464 has finished for PR 19616 at commit ce94235.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 15, 2018

Test build #87462 has finished for PR 19616 at commit 19b6c3a.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@devaraj-kavali
Copy link
Author

@vanzin Thanks for the review, can you have a look into the updated PR?

@vanzin
Copy link
Contributor

vanzin commented Mar 8, 2018

Unlikely I'll be able to review this very soon.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93055 has finished for PR 19616 at commit ce94235.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 17, 2018

Test build #93144 has finished for PR 19616 at commit 0921f7a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @devaraj-kavali , sorry this got left behind. It needs updating and I need to think some more about the changes to ApplicationMaster.scala, but it's in the right direction.

// UI's environment page. This works for client mode; for cluster mode, this is handled
// by the AM.
CACHE_CONFIGS.foreach(sparkConf.remove)
if (!isClientUnmanagedAMEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is happening because you're starting the AM after these are removed from the conf. Should probably juggle things around or change how these are provided to the AM, since these configs are super noisy and shouldn't really show up in the UI.

def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
def getContainerId(sparkConf: SparkConf): ContainerId = {
val containerIdString =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected the indentation

val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
def getContainerId(sparkConf: SparkConf): ContainerId = {
val containerIdString =
if (System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use sparkConf.getenv.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to use sparkConf.getenv

sparkConf.set("spark.yarn.containerId",
ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
// Start Application Service in a separate thread and continue with application monitoring
val amService = new Thread() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added thread name

// Add AMRMToken to establish connection between RM and AM
val token = report.getAMRMToken
val amRMToken: org.apache.hadoop.security.token.Token[AMRMTokenIdentifier] =
new org.apache.hadoop.security.token.Token[AMRMTokenIdentifier](token
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep related calls in the same like (e.g. token.getIdentifier(), new Text(blah))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change, please let me know if anything better can be done here

val yarnConf: YarnConfiguration)
extends Logging {

def this(sparkConf: SparkConf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above constructor for multi-line args style.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this constructor as part of below comment refactor

RpcAddress(driverHost, driverPort),
YarnSchedulerBackend.ENDPOINT_NAME)
var driverRef : RpcEndpointRef = null
if (sparkConf.get(YARN_UNMANAGED_AM)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of this change. Feels like you should have a different method here called runUnmanaged that is called instead of run(), and takes an RpcEnv.

That way you don't need to keep clientRpcEnv at all since it would be local to that method, since nothing else here needs it. In fact even rpcEnv could go away and become a parameter to createAllocator...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to a method runUnmanaged, please let me know if anything can be done better.

@devaraj-kavali
Copy link
Author

Thanks @vanzin for taking time to look into this, will update it with the changes.

@vanzin
Copy link
Contributor

vanzin commented Dec 13, 2018

ok to test

@SparkQA
Copy link

SparkQA commented Dec 13, 2018

Test build #100052 has finished for PR 19616 at commit 837d25f.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@devaraj-kavali
Copy link
Author

devaraj-kavali commented Dec 14, 2018

since these configs are super noisy and shouldn't really show up in the UI.

These configs are getting removed from sparkConf in ApplicationMaster after using.

@SparkQA
Copy link

SparkQA commented Dec 14, 2018

Test build #100163 has finished for PR 19616 at commit 65aeba9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@devaraj-kavali
Copy link
Author

@vanzin can you check the updated changes? thanks

}

private def runImpl(): Unit = {
private def runImpl(opBlock: => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are things in this method that don't look right when you think about an unmanaged AM.

e.g., overriding spark.master, spark.ui.port, etc, look wrong.

The handling of app attempts also seems wrong, since with an unmanaged AM you don't have multiple attempts. Even the shutdown hooks seems a bit out of place.

Seems to me it would be easier not to try to use this method for the unmanaged AM.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored this code

addAmIpFilter(Some(driverRef))
createAllocator(driverRef, sparkConf, clientRpcEnv)

// In client mode the actor will stop the reporter thread.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this as part of the above comment refactor

val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
if (!preserveFiles) {
stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val stagingDir = sys.props.get("...").getOrElse { ... }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change to pass the stagingDir from Client.scala

stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
var stagingDir = System.getenv("SPARK_YARN_STAGING_DIR")
if (stagingDir == null) {
val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks similar to the logic in Client.scala. Maybe the value calculated there should be plumbed through, instead of adding this code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change to pass the stagingDir from Client.scala


if (isClientUnmanagedAMEnabled) {
// Set Unmanaged AM to true in Application Submission Context
appContext.setUnmanagedAM(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appContext.setUnmanagedAM(isClientUnmanagedAMEnabled)

Which also makes the comment unnecessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

}

if (state == YarnApplicationState.ACCEPTED && isClientUnmanagedAMEnabled
&& !amServiceStarted && report.getAMRMToken != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indent one more level

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
// Start Application Service in a separate thread and continue with application monitoring
val amService = new Thread("Unmanaged Application Master Service") {
override def run(): Unit = new ApplicationMaster(new ApplicationMasterArguments(Array.empty),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty long line. Break it down.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated it.

currentUGI.addToken(amRMToken)

sparkConf.set("spark.yarn.containerId",
ContainerId.newContainerId(report.getCurrentApplicationAttemptId, 1).toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this name be the same as the first executor created by the app?

I'd rather special-case getContainerId to return some baked-in string when the env variable is not set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change to pass the appAttemptId from Client.scala

@SparkQA
Copy link

SparkQA commented Dec 20, 2018

Test build #100328 has finished for PR 19616 at commit 93b016f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), None).setCurrentContext()

// This shutdown hook should run *after* the SparkContext is shut down.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is client mode, so you can't rely on shutdown hooks. You need to explicitly stop this service when the SparkContext is shutdown.

Imagine someone just embeds sc = new SparkContext(); ...; sc.stop() in their app code, but the app itself runs for way longer than the Spark app.

}
}

def runUnmanaged(clientRpcEnv: RpcEnv,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multi-line args start on the next line.

throw new SparkException("While loop is depleted! This should never happen...")
}

private def startApplicationMasterService(report: ApplicationReport) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: Unit =

But given you should be explicitly stopping the AM, this should probably return the AM itself.

@SparkQA
Copy link

SparkQA commented Jan 10, 2019

Test build #101002 has finished for PR 19616 at commit 23ad9de.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2019

Test build #101003 has finished for PR 19616 at commit 1c02b7d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 10, 2019

Test build #100997 has finished for PR 19616 at commit dc31940.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2019

Test build #101277 has finished for PR 19616 at commit 2429e19.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@devaraj-kavali
Copy link
Author

@vanzin can you review the latest changes when you have some time? thanks

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor comments. It would be good to add a test for this in YarnClusterSuite.

// In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isClusterMode) {
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment above?

private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"

private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
private var amServiceStarted = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need this extra flag? Could you just check if appMaster != null?

private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode
private var amServiceStarted = false
private var appMaster: ApplicationMaster = _
private var unManagedAMStagingDirPath: Path = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems better to just store this in a variable for all cases. It's recomputed from the conf in a few different places in this class.


/* Unmanaged AM configuration. */

private[spark] val YARN_UNMANAGED_AM = ConfigBuilder("spark.yarn.unmanagedAM")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add .enabled to the config key.

@SparkQA
Copy link

SparkQA commented Jan 24, 2019

Test build #101610 has finished for PR 19616 at commit 6854fc4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@devaraj-kavali
Copy link
Author

@vanzin can you check the updated changes? thanks

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the behavior when the AM fails before the context is stopped?

From the code I see some stuff is printed to the logs and the YARN app is marked as finished. But does the context remain alive? Or should that event cause the context to be stopped?

I'm mostly concerned with how clear it is for the user that the jobs start failing because the context is now unusable.

finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + StringUtils.stringifyException(e))
if (!unregistered) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code needed here? Won't it be called when the client calls stopUnmanaged?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appMaster.runUnmanaged is running in a daemon thread, if something goes unexpected in appMaster.runUnmanaged then the daemon thread stops and monitor thread will not know about it and continue with the status as ACCEPTED/RUNNING, this code unregisters with RM so that the Client/monitor thread gets the application report status as FAILED and stops the services including sc.

}

test("run Spark in yarn-client mode with unmanaged am") {
testBasicYarnApp(true, Map("spark.yarn.unmanagedAM.enabled" -> "true"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YARN_UNMANAGED_AM.key

@devaraj-kavali
Copy link
Author

What's the behavior when the AM fails before the context is stopped?

From the code I see some stuff is printed to the logs and the YARN app is marked as finished. But does the context remain alive? Or should that event cause the context to be stopped?

I'm mostly concerned with how clear it is for the user that the jobs start failing because the context is now unusable.

If AM fails before the context is stopped, AM reports the FAILED status to the RM and Client receives the FAILED status as part of monitoring and stops the services including the context.

@SparkQA
Copy link

SparkQA commented Jan 25, 2019

Test build #101658 has finished for PR 19616 at commit 3b377af.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Jan 25, 2019

If AM fails before the context is stopped...

That explains the YARN side of things. What about the Spark side of things? What will the user see? Is it clear to the user that the context is now unusable because the YARN app is not running anymore?

Or will they get weird errors because of executors not being allocated and things like that?

@devaraj-kavali
Copy link
Author

That explains the YARN side of things. What about the Spark side of things? What will the user see? Is it clear to the user that the context is now unusable because the YARN app is not running anymore?

Or will they get weird errors because of executors not being allocated and things like that?

It gives an error log from YarnClientSchedulerBackend that YARN application has exited unexpectedly with state FAILED!... and shows the user that (No active SparkContext.) when try to access, and no additional errors, the behavior is similar to application failed in yarn-client mode without unmanaged am enabled.

@vanzin
Copy link
Contributor

vanzin commented Jan 25, 2019

Sounds good. Merging to master.

@asfgit asfgit closed this in f06bc0c Jan 25, 2019
@devaraj-kavali
Copy link
Author

Thank you so much @vanzin.

// Add log urls
container.foreach { c =>
sys.env.get("SPARK_USER").foreach { user =>
sys.env.filterKeys(_.endsWith("USER")).foreach { user =>
Copy link
Contributor

@HeartSaVioR HeartSaVioR Jan 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@devaraj-kavali @vanzin

While resolving merge conflict in #23260, I found two observations here:

  1. What output we expect when env have more than one matching keys? This looks like always leveraging the last key which is indeterministic if there're more than one keys being matched.

  2. Here the user is not a value but a (key, value) here, so we need to use either key or value (I guess we would like to pick value).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HeartSaVioR for finding and reporting it, It was my mistake, I am sorry for that. I have created PR #23659 to fix the issue.

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ent mode

## What changes were proposed in this pull request?

Providing a new configuration "spark.yarn.un-managed-am" (defaults to false) to enable the Unmanaged AM Application in Yarn Client mode which launches the Application Master service as part of the Client. It utilizes the existing code for communicating between the Application Master <-> Task Scheduler for the container requests/allocations/launch, and eliminates these,
1. Allocating and launching the Application Master container
2. Remote Node/Process communication between Application Master <-> Task Scheduler

## How was this patch tested?

I verified manually running the applications in yarn-client mode with "spark.yarn.un-managed-am" enabled, and also ensured that there is no impact to the existing execution flows.

I would like to hear others feedback/thoughts on this.

Closes apache#19616 from devaraj-kavali/SPARK-22404.

Authored-by: Devaraj K <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants