Skip to content

Conversation

@LantaoJin
Copy link
Contributor

What changes were proposed in this pull request?

#24595 introduced private val runId: UUID = UUID.randomUUID in BroadcastExchangeExec to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its runId to abort these broadcast sub-jobs when the SQL statement is cancelled.

Why are the changes needed?

When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manually test.
Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually:

  1. Start a Spark thrift-server with less resource in YARN.
  2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline.
  3. Cancel the SQL in beeline

Without the patch, broadcast sub-jobs won't be cancelled.
Screen Shot 2021-01-11 at 12 03 13 PM

With this patch, broadcast sub-jobs will be cancelled.
Screen Shot 2021-01-11 at 11 43 40 AM

@github-actions github-actions bot added the SQL label Jan 11, 2021
@LantaoJin
Copy link
Contributor Author

LantaoJin commented Jan 11, 2021

Without the patch, the driver log, the Job 0 and Job 1 are still executing.

21/01/10 21:02:33,406 INFO [HiveServer2-Handler-Pool: Thread-255] thriftserver.SparkExecuteStatementOperation:57 : Submitting query 'select s... where rn<=2' with e5a71a11-817e-43f0-b0ae-083313d71a4b
21/01/10 21:02:33,411 INFO [HiveServer2-Background-Pool: Thread-260] query:258 : Running query [e5a71a11-817e-43f0-b0ae-083313d71a4b] in session [36ded59a-d5c6-45f0-b83b-d83034085d13] 'select ... where rn<=2'

21/01/10 21:02:43,269 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler:57 : Submitting ResultStage 0 (MapPartitionsRDD[7] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:266), which has no missing parents
21/01/10 21:02:43,362 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler:57 : Submitting ResultStage 1 (MapPartitionsRDD[6] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:266), which has no missing parents

21/01/10 21:02:53,543 INFO [HiveServer2-Handler-Pool: Thread-255] thriftserver.SparkExecuteStatementOperation:57 : Cancel query with e5a71a11-817e-43f0-b0ae-083313d71a4b
21/01/10 21:02:53,543 INFO [spark-listener-group-appStatus] ui.SQLAppStatusListener:434 : [TaskProgress] execution ends active tasks: 0, completed tasks: 0, total tasks: 462.
21/01/10 21:02:53,543 INFO [HiveServer2-Handler-Pool: Thread-255] scheduler.DAGScheduler:57 : Asked to cancel job group e5a71a11-817e-43f0-b0ae-083313d71a4b
21/01/10 21:02:53,545 INFO [HiveServer2-Background-Pool: Thread-260] scheduler.DAGScheduler:57 : Asked to cancel job group e5a71a11-817e-43f0-b0ae-083313d71a4b

With the patch, the related driver log, we can see the Job 0 and Job 1 failed due to cancelling.

21/01/10 20:43:13,250 INFO [HiveServer2-Handler-Pool: Thread-257] thriftserver.SparkExecuteStatementOperation:57 : Submitting query 'select ... where rn<=2' with c971270f-231d-42d5-af35-86463416861d
21/01/10 20:43:13,252 INFO [HiveServer2-Background-Pool: Thread-263] query:258 : Running query [c971270f-231d-42d5-af35-86463416861d] in session [692f89f2-087d-4879-94d9-605c7c6e6c6c] 'select ... where rn<=2'

21/01/10 20:43:19,063 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler:57 : Submitting ResultStage 0 (MapPartitionsRDD[7] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:266), which has no missing parents
21/01/10 20:43:19,175 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler:57 : Submitting ResultStage 1 (MapPartitionsRDD[6] at $anonfun$withThreadLocalCaptured$1 at FutureTask.java:266), which has no missing parents

21/01/10 20:43:23,273 INFO [HiveServer2-Handler-Pool: Thread-257] thriftserver.SparkExecuteStatementOperation:57 : Cancel query with c971270f-231d-42d5-af35-86463416861d
21/01/10 20:43:23,273 INFO [HiveServer2-Handler-Pool: Thread-257] scheduler.DAGScheduler:57 : Asked to cancel job group c971270f-231d-42d5-af35-86463416861d
21/01/10 20:43:23,274 INFO [spark-listener-group-appStatus] ui.SQLAppStatusListener:434 : [TaskProgress] execution ends active tasks: 0, completed tasks: 0, total tasks: 462.
21/01/10 20:43:23,276 INFO [HiveServer2-Background-Pool: Thread-263] scheduler.DAGScheduler:57 : Asked to cancel job group c971270f-231d-42d5-af35-86463416861d
21/01/10 20:43:23,278 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Cancelling stage 0
21/01/10 20:43:23,279 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Killing all running tasks in stage 0: Stage cancelled
21/01/10 20:43:23,280 INFO [HiveServer2-Handler-Pool: Thread-257] scheduler.DAGScheduler:57 : Asked to cancel job group c971270f-231d-42d5-af35-86463416861d
21/01/10 20:43:23,280 INFO [HiveServer2-Handler-Pool: Thread-257] thriftserver.SparkExecuteStatementOperation:57 : Close statement with c971270f-231d-42d5-af35-86463416861d
21/01/10 20:43:23,284 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Removed TaskSet 0.0, whose tasks have all completed, from pool default
21/01/10 20:43:23,285 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Stage 0 was cancelled
21/01/10 20:43:23,285 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler:57 : ResultStage 0 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:266) failed in 4.194 s due to Job 0 cancelled part of cancelled job group c971270f-231d
-42d5-af35-86463416861d.
21/01/10 20:43:23,291 INFO [broadcast-exchange-0] scheduler.DAGScheduler:57 : Job 0 failed: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:266, took 4.257828 s
21/01/10 20:43:23,292 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Cancelling stage 1
21/01/10 20:43:23,293 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Killing all running tasks in stage 1: Stage cancelled
21/01/10 20:43:23,293 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Removed TaskSet 1.0, whose tasks have all completed, from pool default
21/01/10 20:43:23,293 INFO [dag-scheduler-event-loop] cluster.YarnClusterScheduler:57 : Stage 1 was cancelled
21/01/10 20:43:23,294 INFO [dag-scheduler-event-loop] scheduler.DAGScheduler:57 : ResultStage 1 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:266) failed in 4.108 s due to Job 1 cancelled part of cancelled job group c971270f-231d
-42d5-af35-86463416861d.
21/01/10 20:43:23,295 INFO [broadcast-exchange-1] scheduler.DAGScheduler:57 : Job 1 failed: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:266, took 4.261140 s

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Test build #133907 has finished for PR 31119 at commit 32adf03.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38499/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38499/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38503/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38503/

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Test build #133912 has finished for PR 31119 at commit 804f4da.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 11, 2021

Test build #133916 has finished for PR 31119 at commit 03d79d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum requested a review from cloud-fan January 12, 2021 00:21
import BroadcastExchangeExec._

override val runId: UUID = UUID.randomUUID
private val groupId = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it reliable? does the local property always contain the group id of the query that this broadcast belongs to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For STS, I think so. Building this physical plan preformed in a thread of HiveServer2-Background-Pool, the same thread of setting up the job group Id, parsing, analyzing, and optimizing, etc. Still keeping UUID.randomUUID is to avoid UT failures IIUC.

@cloud-fan
Copy link
Contributor

Is "canceling SQL statement" a STS specific feature? It doesn't seem to be implemented properly, as we lack the ability to track all the spark jobs submitted by one SQL statement.

@cloud-fan
Copy link
Contributor

hmm, IIRC the SQL UI can know all the jobs of a SQL statement, how is that done? cc @gengliangwang

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Jan 12, 2021

Is "canceling SQL statement" a STS specific feature? It doesn't seem to be implemented properly, as we lack the ability to track all the spark jobs submitted by one SQL statement.

Job group is not a STS specific feature. It's a API in SparkContext. But currently, only STS sets it with statementId in SparkExecuteStatementOperation, SQL CLI doesn't use it. So using the random UUID if not specified should be compatible with current implementation.

@gengliangwang
Copy link
Member

hmm, IIRC the SQL UI can know all the jobs of a SQL statement, how is that done? cc @gengliangwang

For SQL execution, the SparkListenerJobStart event contains the SQL execution ID. So that Spark can associate the jobs to SQL execution in SQLAppStatusListener.onJobStart

@cloud-fan
Copy link
Contributor

cloud-fan commented Jan 12, 2021

Cool, then it proves that the local property is a reliable way to track the jobs for a SQL query. BTW why doesn't STS use SQL execution ID to cancel the jobs for a SQL query? cc @wangyum @yaooqinn

@yaooqinn
Copy link
Member

Nice catch.

BTW why doesn't STS use SQL execution ID to cancel the jobs for a SQL query?

I have no idea. I guess the IDs are mainly used for the listeners (SQLAppStatusListener and HiveThriftServer2Listener) to capture and track different SQL states.

For canceling related jobs, I think the IDs are both OK to use I we want them to be unified

@cloud-fan
Copy link
Contributor

@LantaoJin can you do a bit more manual test with broadcast inside a subquery?

@LantaoJin
Copy link
Contributor Author

@LantaoJin can you do a bit more manual test with broadcast inside a subquery?

Sure, let me have a try.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Jan 13, 2021

BTW why doesn't STS use SQL execution ID to cancel the jobs for a SQL query?

A statementId bound in SparkOperation can be accessed in two threads (HiveServer2-Handler-Pool and HiveServer2-Background-Pool), it designed as non-thread-local instance. STS uses it as a job group id (thread local). But executionId always be treated as a thread local instance, though we could use it do the same thing.

@wangyum
Copy link
Member

wangyum commented Jan 13, 2021

The statementId was introduced by https://github.com/apache/spark/pull/3946/files#diff-84cb625cf9fbadddac3b710431009437e63de5211d3d43742dae49cbac95d1e4R207 to support thriftserver-ui.

Could we move this change to BroadcastExchangeExec.scala#L49?

@LantaoJin
Copy link
Contributor Author

Another reason I think is the generation of executionId is too late, it generated only we invoke sc.sql(). But statementId needed in thrift server-client protocol part (what HiveServer2-Handler-Pool does). statementId is used before statement being preformed. Just my own understanding.

@LantaoJin
Copy link
Contributor Author

Could we move this change to BroadcastExchangeExec.scala#L49?

Better don't. BroadcastExchangeExec.scala#L49 is a def, we don't want to add any val in trait BroadcastExchangeLike and we want to stabilize the value when the spark plan built, instead of invoking in uncertain thread.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Jan 13, 2021

@cloud-fan I tested manually and broadcast inside a subquery could be cancelled with this patch (I also tested the same query without patch, and it cannot be cancelled).

The plan was like this:
Screen Shot 2021-01-13 at 3 06 35 PM

@SparkQA
Copy link

SparkQA commented Jan 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38592/

@SparkQA
Copy link

SparkQA commented Jan 13, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/38592/

@cloud-fan
Copy link
Contributor

GA passed, merging to master/3.1! (This is a bug fix of the SQL statement canceling feature in Spark thrift server)

@cloud-fan cloud-fan closed this in f1b21ba Jan 13, 2021
cloud-fan pushed a commit that referenced this pull request Jan 13, 2021
…tement is cancelled

### What changes were proposed in this pull request?
#24595 introduced `private val runId: UUID = UUID.randomUUID` in `BroadcastExchangeExec` to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its `runId` to abort these broadcast sub-jobs when the SQL statement is cancelled.

### Why are the changes needed?
When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually test.
Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually:
1. Start a Spark thrift-server with less resource in YARN.
2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline.
3. Cancel the SQL in beeline

Without the patch, broadcast sub-jobs won't be cancelled.
![Screen Shot 2021-01-11 at 12 03 13 PM](https://user-images.githubusercontent.com/1853780/104150975-ab024b00-5416-11eb-8bf9-b5167bdad80a.png)

With this patch, broadcast sub-jobs will be cancelled.
![Screen Shot 2021-01-11 at 11 43 40 AM](https://user-images.githubusercontent.com/1853780/104150994-be151b00-5416-11eb-80ff-313d423c8a2e.png)

Closes #31119 from LantaoJin/SPARK-34064.

Authored-by: LantaoJin <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit f1b21ba)
Signed-off-by: Wenchen Fan <[email protected]>
@SparkQA
Copy link

SparkQA commented Jan 13, 2021

Test build #134004 has finished for PR 31119 at commit b3f1453.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override val runId: UUID = UUID.randomUUID
// Cancelling a SQL statement from Spark ThriftServer needs to cancel
// its related broadcast sub-jobs. So set the run id to job group id if exists.
override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, I think this is risky. It's possible that in a non-STS environment, users set job group id manually, and run some long-running jobs. If we capture the job group id here in broadcast exchange, when the broadcast timeout, it will cancel the whole job group which may kill the user's other long-running jobs unexpectedly.

I think we need to revisit the STS's SQL statement canceling feature. We should use SQL execution ID to find out all the jobs of a SQL query, and assign a unique job group id to them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LantaoJin what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan yes, the case you said is a problem in current implementation. I will give a new PR. Revert this first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Job group id is still a basic API which used to cancel the a group of jobs (depends on custom business). In a non-STS environment, users can set job group id manually, and run some long-running jobs. In some cases, such as a custom exception, user want to cancel all jobs with the same job group. And broadcast timeout shouldn't use job group Id to can broadcast job.

Copy link
Contributor

@cloud-fan cloud-fan Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me revert this first. Please let me know when you have a new fix, thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/ a conf(maybe named spark.jobGroubID.inherited) to decide whether the runId is re-generated or inherited from the former specified one. Users may develop applications like ThriftServer in C/S architecture as a server-like spark program.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't get the point. If a user set spark.jobGroubID.inherited to true and set a custom jobGroupId to a UUID value, when the broadcast timeout, what's behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this?

  override val runId: UUID =
    if (SQLConf.get.getConf(spark.jobGroubID.inherited)) {
       UUID.randomUUID
    } else {
      UUID.fromString(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, something like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I know. To be transparent to users, how about add a new thread local property SparkContext.SPARK_RESERVED_JOB_GROUP_ID or SPARK_THRIFTSERVER_JOB_GROUP_ID to separate it.

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f6e8a5694d..cc3efed713 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -760,9 +760,13 @@ class SparkContext(config: SparkConf) extends Logging {
    * may respond to Thread.interrupt() by marking nodes as dead.
    */
   def setJobGroup(groupId: String,
-      description: String, interruptOnCancel: Boolean = false): Unit = {
+      description: String, interruptOnCancel: Boolean = false, reserved: Boolean = false): Unit = {
     setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
-    setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+    if (reserved) {
+      setLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID, groupId)
+    } else {
+      setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
+    }
     // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
     // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
     // APIs to also take advantage of this property (e.g., internal job failures or canceling from
@@ -2760,6 +2764,7 @@ object SparkContext extends Logging {

   private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
+  private[spark] val SPARK_RESERVED_JOB_GROUP_ID = "spark.reservedJobGroup.id"
   private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
   private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
   private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c322d5eef5..25abb4f2d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -76,7 +76,8 @@ case class BroadcastExchangeExec(

   // Cancelling a SQL statement from Spark ThriftServer needs to cancel
   // its related broadcast sub-jobs. So set the run id to job group id if exists.
-  override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+  override val runId: UUID =
+    Option(sparkContext.getLocalProperty(SparkContext.SPARK_RESERVED_JOB_GROUP_ID))
       .map(UUID.fromString).getOrElse(UUID.randomUUID)

   override lazy val metrics = Map(
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8ca0ab91a7..4db50e8d00 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -286,7 +286,7 @@ private[hive] class SparkExecuteStatementOperation(
         parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
       }

-      sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel)
+      sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel, true)
       result = sqlContext.sql(statement)
       logDebug(result.queryExecution.toString())
       HiveThriftServer2.eventManager.onStatementParsed(statementId,

@cloud-fan
Copy link
Contributor

reverted.

@LantaoJin
Copy link
Contributor Author

@cloud-fan , it is a little bit hard to use execution id as group Id since SparkExecuteStatementOperation has no chance to get the execution id before executing and the statement id (UUID) used as job group id looks very self-consistent in STS. So a simple way to fix this is to know the group id is set by whom: if the group id is set by STS, use group id as runId, or else use a random UUID as runId.

@LantaoJin
Copy link
Contributor Author

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 50cc47d0f8..3d2827ecb7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -977,6 +977,15 @@ object SQLConf {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefault(0L)

+  val THRIFTSERVER_BROADCAST_CANCEL =
+    buildConf("spark.sql.thriftServer.broadcastCancel")
+      .internal()
+      .doc("When true, cancel the related broadcast sub-jobs when SQL statement is cancelled. " +
+        "This configuration is only used internally and don't set it manually.")
+      .version("3.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val THRIFTSERVER_UI_STATEMENT_LIMIT =
     buildConf("spark.sql.thriftserver.ui.retainedStatements")
       .doc("The number of SQL statements kept in the JDBC/ODBC web UI history.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c322d5eef5..39ea035d62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -76,8 +76,12 @@ case class BroadcastExchangeExec(

   // Cancelling a SQL statement from Spark ThriftServer needs to cancel
   // its related broadcast sub-jobs. So set the run id to job group id if exists.
-  override val runId: UUID = Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
-      .map(UUID.fromString).getOrElse(UUID.randomUUID)
+  override val runId: UUID =
+    if (SQLConf.get.getConf(SQLConf.THRIFTSERVER_BROADCAST_CANCEL)) {
+      UUID.fromString(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
+    } else {
+      UUID.randomUUID
+    }

   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 8ca0ab91a7..68a02842ef 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -285,7 +285,7 @@ private[hive] class SparkExecuteStatementOperation(
       if (!runInBackground) {
         parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
       }
-
+      sqlContext.conf.setConf(SQLConf.THRIFTSERVER_BROADCAST_CANCEL, true)
       sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, forceCancel)
       result = sqlContext.sql(statement)
       logDebug(result.queryExecution.toString())

@cloud-fan
Copy link
Contributor

Can we introduce a new special local property? The semantic can be: if it's set, all the jobs submitted by SQL queries should use it as job group id, regardless of what the current job group id is in the local properties. Then STS can set that special local property.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants