-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-20774][SPARK-27036][SQL] Cancel the running broadcast execution on BroadcastTimeout #24595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #105370 has finished for PR 24595 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/BroadcastExchangeSuite.scala
Outdated
Show resolved
Hide resolved
|
This has a more narrow scope compared to #24036 , and is easier to reason about. +1 |
|
Test build #105393 has finished for PR 24595 at commit
|
| // with the correct execution. | ||
| SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { | ||
| try { | ||
| sparkContext.setJobGroup(runId.toString, s"broadcast exchange (runId $runId)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a comment to explain why we set up a job group here. There is no other public API that can cancel a specific job AFAIK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why can't we just inherit the job group id of the outside thread so that when the SQL statement was cancelled, these broadcast sub-jobs can be cancelled as a whole?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds like a good idea. We should only set the job group if there is no one outside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that cancelling the broadcast job cause the outer main job to cancel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only set the job group if there is no one outside.
and I guess it would be a partial fix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's always better to have fewer configs if possible. And I don't think we can override the job group id here if the config is true, as this is used to cancel broadcast after timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon @jiangxb1987 @yeshengm What‘s your opinion of my idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am discussing about multiple job group support which will fundamentally fix all these problems. This is actually a general problem that's not speicfic to SQL broadcast here only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HyukjinKwon Could you please tell me where you are discussing? I also want to make a little contribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I am discussing offline first. I will send out an email or JIRA soon for more open discussion soon.
| override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { | ||
| try { | ||
| ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] | ||
| relationFuture.get(timeout.toSeconds, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also fix -1 too?
|
Test build #105425 has finished for PR 24595 at commit
|
gatorsmile
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Merged to master.
…n on BroadcastTimeout In the existing code, a broadcast execution timeout for the Future only causes a query failure, but the job running with the broadcast and the computation in the Future are not canceled. This wastes resources and slows down the other jobs. This PR tries to cancel both the running job and the running hashed relation construction thread. Add new test suite `BroadcastExchangeExec` Closes apache#24595 from jiangxb1987/SPARK-20774. Authored-by: Xingbo Jiang <[email protected]> Signed-off-by: gatorsmile <[email protected]>
|
@jiangxb1987 Just out of curiosity, the timeout mechanism was added because we have no way to track the lineage of such broadcast sub job right? Even if the "main" RDD action was cancelled, these broadcast jobs running in other threads will just keep running until it fails or timed out (which was added in this diff). |
…tement is cancelled ### What changes were proposed in this pull request? #24595 introduced `private val runId: UUID = UUID.randomUUID` in `BroadcastExchangeExec` to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its `runId` to abort these broadcast sub-jobs when the SQL statement is cancelled. ### Why are the changes needed? When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually: 1. Start a Spark thrift-server with less resource in YARN. 2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline. 3. Cancel the SQL in beeline Without the patch, broadcast sub-jobs won't be cancelled.  With this patch, broadcast sub-jobs will be cancelled.  Closes #31119 from LantaoJin/SPARK-34064. Authored-by: LantaoJin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…tement is cancelled ### What changes were proposed in this pull request? #24595 introduced `private val runId: UUID = UUID.randomUUID` in `BroadcastExchangeExec` to cancel the broadcast execution in the Future when timeout happens. Since the runId is a random UUID instead of inheriting the job group id, when a SQL statement is cancelled, these broadcast sub-jobs are still executing. This PR uses the job group id of the outside thread as its `runId` to abort these broadcast sub-jobs when the SQL statement is cancelled. ### Why are the changes needed? When broadcasting a table takes too long and the SQL statement is cancelled. However, the background Spark job is still running and it wastes resources. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually test. Since broadcasting a table is too fast to cancel in UT, but it is very easy to verify manually: 1. Start a Spark thrift-server with less resource in YARN. 2. When the driver is running but no executors are launched, submit a SQL which will broadcast tables from beeline. 3. Cancel the SQL in beeline Without the patch, broadcast sub-jobs won't be cancelled.  With this patch, broadcast sub-jobs will be cancelled.  Closes #31119 from LantaoJin/SPARK-34064. Authored-by: LantaoJin <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit f1b21ba) Signed-off-by: Wenchen Fan <[email protected]>
|
@jiangxb1987 @cloud-fan @yeshengm I just deleted the following two lines of code to solve this bug:
Could you tell me the necessity of setJobGroup here? It will override the configuration of job group and job description in the user code. |
|
Then I think we'd have to wait for the broadcast job to finish even when the job cancellation is triggered for the main job, right? |
Thank you for your attention. I mean even if it should support cancellation, it should not overwrite the user's configuration.I'm trying better approach to solve this problem |
|
I think maybe we should have a way to set multiple job groups actaully. That will make everything easier. |
|
@Shockang how do you think about this proposal? https://github.com/apache/spark/pull/24595/files#r667590820 |
Sorry, I'm busy in the past several days. You can take a look at my suggestions. |
What changes were proposed in this pull request?
In the existing code, a broadcast execution timeout for the Future only causes a query failure, but the job running with the broadcast and the computation in the Future are not canceled. This wastes resources and slows down the other jobs. This PR tries to cancel both the running job and the running hashed relation construction thread.
How was this patch tested?
Add new test suite
BroadcastExchangeExec