Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Mar 7, 2023

What changes were proposed in this pull request?

Currently when we run yarn-client mode in SparkSubmit, when we catch exceptions during runMain()
Spark won't call SparkContext.stop() since pr #33403 remove the behavior.
Then AM side will mark the application as SUCCESS.

In this pr, we will revert the behavior of #33403 then YARN mode, it will call sc.stop() in YARN env, and also, the client side will pass the correct exit code to the YARN AM side.

This pr fixes this issue.

Why are the changes needed?

Keep the same exit code between the client and AM

Does this PR introduce any user-facing change?

No

How was this patch tested?

Mannul tested

The screenshot is our internal platform to show each app's status, the application status information is from YARN rm and timeline service.

Before change
截屏2023-03-08 上午11 45 31

After change

截屏2023-03-08 上午11 45 13

@github-actions github-actions bot added the CORE label Mar 7, 2023
@AngersZhuuuu
Copy link
Contributor Author

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think you can add a test case, @AngersZhuuuu ?

!isConnectServer(args.mainClass)) {
try {
SparkContext.getActive.foreach(_.stop())
SparkContext.getActive.foreach(_.stop(exitCode))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is related to YARN AM because this is guarded by if (args.master.startsWith("k8s"). Is this K8s patch instead of YARN AM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code in spark-3.1.2 is

      if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) &&
        !isThriftServer(args.mainClass)) {

Seems #33403 change the behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know too much about K8S scheduler, but for yarn client mode we also need to keep a same exit code.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @AngersZhuuuu .
This PR seems to have insufficient information. Could you provide more details about how to validate this in what environment?

@AngersZhuuuu
Copy link
Contributor Author

Hi, @AngersZhuuuu .
This PR seems to have insufficient information. Could you provide more details about how to validate this in what environment?

We run a client mode SparkSubmit job and throw below exception

23/03/07 18:34:50 INFO YarnClientSchedulerBackend: Shutting down all executors
23/03/07 18:34:50 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
23/03/07 18:34:50 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
23/03/07 18:34:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/03/07 18:34:50 INFO BlockManager: BlockManager stopped
23/03/07 18:34:50 INFO BlockManagerMaster: BlockManagerMaster stopped
23/03/07 18:34:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/03/07 18:34:50 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: xxx.xxx; line 1 pos 14;
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Project [*]
      +- 'UnresolvedRelation [xxx, xxx], [], false

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:115)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:95)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:184)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:183)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:95)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:92)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:178)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:175)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:778)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:778)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:621)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:778)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:616)
	at org.apache.spark.sql.auth.QueryAuthChecker$.main(QueryAuthChecker.scala:33)
	at org.apache.spark.sql.auth.QueryAuthChecker.main(QueryAuthChecker.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/03/07 18:34:50 INFO ShutdownHookManager: Shutdown hook called
23/03/07 18:34:50 INFO ShutdownHookManager: Deleting directory /tmp/spark-8ce833e1-3cd4-4a9f-960d-695be85b12f4
23/03/07 18:34:50 INFO ShutdownHookManager: Deleting directory /hadoop/spark/sparklocaldir/spark-58bbd530-6144-4ad6-b62b-a690baac9f96
23/03/07 18:34:50 INFO SparkExecutionPlanProcessor: Lineage thread pool prepares to shut down
23/03/07 18:34:50 INFO SparkExecutionPlanProcessor: Lineage thread pool finishes to await termination and shuts down

This job failed, but with call sparkContext.stop(), client side failed but in AM it shows SUCCESS
In spark-3.1.2 the code like this

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    } finally {
      if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) &&
        !isThriftServer(args.mainClass)) {
        try {
          SparkContext.getActive.foreach(_.stop())
        } catch {
          case e: Throwable => logError(s"Failed to close SparkContext: $e")
        }
      }
    }

So here for normal job, I think we should pass the exit code to SchedulerBackend, right?

Then after your mention, I see that #33403 change the behavior that only k8s call sc.stop(), then I think for k8s and yarn mode we booth need to pass the exit code the backend.

After this pr, we also need to check if k8s backend exit code is same as client side in client mode too.

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-42698][CORE] SparkSubmit should pass exitCode to AM side [SPARK-42698][CORE] SparkSubmit should pass exitCode to AM side for yarn mode Mar 8, 2023
@cloud-fan
Copy link
Contributor

Does YARN still have this issue with Spark 3.4?

@dongjoon-hyun
Copy link
Member

cc @mridulm and @tgravescs , too

@AngersZhuuuu
Copy link
Contributor Author

Does YARN still have this issue with Spark 3.4?

Didn't see such fix in current code.

@cloud-fan
Copy link
Contributor

This seems to be a revert of #33403 as now we stop SparkContext in YARN environment as well. We should justify it in the PR description. This is not simply passing the exitCode. Please update the PR title as well.

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-42698][CORE] SparkSubmit should pass exitCode to AM side for yarn mode [SPARK-42698][CORE] SparkSubmit should also stop SparkContext when exit program in yarn mode and pass exitCode to AM side Mar 10, 2023
@AngersZhuuuu
Copy link
Contributor Author

This seems to be a revert of #33403 as now we stop SparkContext in YARN environment as well. We should justify it in the PR description. This is not simply passing the exitCode. Please update the PR title as well.

DOne

@cloud-fan
Copy link
Contributor

@dongjoon-hyun do you have more context about #33403? Why do we limit the stopping spark context behavior to k8s only?

@AngersZhuuuu
Copy link
Contributor Author

Failed UT should not related to this pr.

@AngersZhuuuu
Copy link
Contributor Author

@cloud-fan Seems this code #32283 first want to fix issue in k8s, then @dongjoon-hyun make it limit in k8s env. But this also can work for yarn env....

@dongjoon-hyun
Copy link
Member

To @cloud-fan and all. Here is the full context.

Three months later after merging the second commit, there was a post-commit review.

Since SPARK-34674 was released already to Spark 3.1.2, according to the post-commit comment, I made a new JIRA, SPARK-36193 (#33403) which was released as 3.1.3.

  • [SPARK-36193][CORE] Recover SparkSubmit.runMain not to stop SparkContext in non-K8s env

@cloud-fan
Copy link
Contributor

@AngersZhuuuu can you comment on the original discussion thread and convince related people to add back sc.stop? #32081 (comment)

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu can you comment on the original discussion thread and convince related people to add back sc.stop? #32081 (comment)

TBH, you can think it as a new feature, since they first just want to only support k8s, and this pr support yarn too

@cloud-fan
Copy link
Contributor

I believe people in that discussion thread have the most context (some of them are committers) and I'm not comfortable merging it without them taking a look.

@AngersZhuuuu
Copy link
Contributor Author

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants